Clustering Quartz Jobs

I was looking for a scale-out option with scheduling jobs and having used quartz previously, found that it is pretty easy to get clustering up and running pretty easily. The only caveat being that it is possible only with JDBC job store. The sample I tried with was a straight-forward job that just prints the time and the scheduler which has triggered it.

Sample Job:

import org.quartz.*;

@PersistJobDataAfterExecution
public class PrintJob implements Job {

   public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
      try {
         System.out.println("Print : "+System.currentTimeMillis()+" , "+jobExecutionContext.getScheduler().getSchedulerInstanceId());
      } catch (SchedulerException e) {
         e.printStackTrace();
      }
   }
}

Sample Trigger:

import org.quartz.*;
import org.quartz.impl.StdSchedulerFactory;

import java.io.*;
import java.util.Properties;

import static org.quartz.JobBuilder.newJob;
import static org.quartz.SimpleScheduleBuilder.simpleSchedule;

public class PrintScheduler {

	private Scheduler scheduler;
	public PrintScheduler(String instanceId) {
		try {
			Properties properties = loadProperties();
			properties.put("org.quartz.scheduler.instanceId",instanceId);
			scheduler = new StdSchedulerFactory(properties).getScheduler();
			scheduler.start();
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	private Properties loadProperties() throws FileNotFoundException,IOException {
		Properties properties = new Properties();
		try (InputStream fis = PrintScheduler.class.getResourceAsStream("quartz.properties")) {
			properties.load(fis);
		}
		return properties;
	}

	public void schedule() throws SchedulerException {
		JobDetail job = newJob(PrintJob.class).withIdentity("printjob", "printjobgroup").build();
		Trigger trigger = TriggerBuilder.newTrigger().withIdentity("printTrigger", "printtriggergroup")
				.startNow().withSchedule(simpleSchedule().withIntervalInMilliseconds(100l).repeatForever()).build();
		scheduler.scheduleJob(job, trigger);
	}

	public void stopScheduler() throws SchedulerException {
		scheduler.shutdown();
	}

	public static void main(String[] args) {
		PrintScheduler printScheduler = new PrintScheduler(args[0]);
		try {
//			printScheduler.schedule();
			Thread.sleep(60000l);
			printScheduler.stopScheduler();
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

}

Please note, I have used quartz 2.x for this example.

On the configuration side, more-or-less it remains the same as for single node with couple of exceptions –

org.quartz.scheduler.instanceName = PRINT_SCHEDULER1
org.quartz.threadPool.class = org.quartz.simpl.SimpleThreadPool
org.quartz.threadPool.threadCount = 4
org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread = true

#specify the jobstore used
org.quartz.jobStore.class = org.quartz.impl.jdbcjobstore.JobStoreTX
org.quartz.jobStore.driverDelegateClass = org.quartz.impl.jdbcjobstore.StdJDBCDelegate
org.quartz.jobStore.useProperties = false

#The datasource for the jobstore that is to be used
org.quartz.jobStore.dataSource = myDS

#quartz table prefixes in the database
org.quartz.jobStore.tablePrefix = qrtz_
org.quartz.jobStore.misfireThreshold = 60000
org.quartz.jobStore.isClustered = true
org.quartz.scheduler.instanceId = PRINT_SCHEDULER1

#The details of the datasource specified previously
org.quartz.dataSource.myDS.driver = com.mysql.jdbc.Driver
org.quartz.dataSource.myDS.URL = jdbc:mysql://localhost:3307/blog_test
org.quartz.dataSource.myDS.user = root
org.quartz.dataSource.myDS.password = root
org.quartz.dataSource.myDS.maxConnections = 20<span id="mce_SELREST_start" style="overflow:hidden;line-height:0;"></span>

The configurations that are cluster specific here are –  org.quartz.jobStore.isClustered and org.quartz.scheduler.instanceId. In case of a single node instance, org.quartz.jobStore.isClustered is marked as false. In case of a cluster setup, it is changed to true. The second property that needs to be changed is on the instanceId which is like a name/ID used to uniquely identify the scheduler instance in the cluster. This property can be marked as AUTO in which case, each scheduler instance will be automatically assigned with a unique value, or you can choose to provide a value on your own (which I find useful since it helps me identify where the job is running). But, please note that the uniqueness is still to be maintained.

One of the requirement for this to work is to have time sync between the nodes running the scheduler instances or there might be issues with the schedule. Also, there is no guarantee that there will be equal load distribution amongst the nodes with clustering. As per the documentation, quartz ideally prefers to run the job on the same node in case it is not currently on load.

Code @ https://github.com/vageeshhoskere/blog/tree/master/quartz

JDBCJobStore in Quartz Scheduler

Note: Updated the post to reflect code from Quartz version 2.2.*

As mentioned in the previous post, a Job Store is used by the quartz scheduler to store information about itself, and a JDBCJobStore is a way of maintaining the quartz job details over a database via the JDBC. While the use of RAMJobStore indicates volatile storage of the quartz job details, a JDBCJobStore ensures that the information on the quartz jobs, triggers, calendars etc are available any time in case the system has a downtime and then can be rescheduled once the system is up.

A JDBCJobStore requires some database tables to be present in the data source defined for use of quartz. The sql queries for creating and populating the required tables is available under the docs/dbTables folder of the quartz distribution. The following example uses the MySQL database to schedule a job using the JDBCJobStore.

The file quartz.properties is used by the program to define the quartz properties –
Sample Program – file quartz.properties

org.quartz.scheduler.instanceName = PRINT_SCHEDULER
org.quartz.threadPool.class = org.quartz.simpl.SimpleThreadPool
org.quartz.threadPool.threadCount = 4
org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread = true

#specify the jobstore used
org.quartz.jobStore.class = org.quartz.impl.jdbcjobstore.JobStoreTX
org.quartz.jobStore.driverDelegateClass = org.quartz.impl.jdbcjobstore.StdJDBCDelegate
org.quartz.jobStore.useProperties = false

#The datasource for the jobstore that is to be used
org.quartz.jobStore.dataSource = myDS

#quartz table prefixes in the database
org.quartz.jobStore.tablePrefix = qrtz_
org.quartz.jobStore.misfireThreshold = 60000
org.quartz.jobStore.isClustered = true
org.quartz.scheduler.instanceId = PRINT_SCHEDULER

#The details of the datasource specified previously
org.quartz.dataSource.myDS.driver = com.mysql.jdbc.Driver
org.quartz.dataSource.myDS.URL = jdbc:mysql://localhost:3307/blog_test
org.quartz.dataSource.myDS.user = root
org.quartz.dataSource.myDS.password = root
org.quartz.dataSource.myDS.maxConnections = 20

Sample program PrintScheduler.java

import org.quartz.*;
import org.quartz.impl.StdSchedulerFactory;
import ramstore.PrintStatefulJob;

import java.io.*;
import java.util.Properties;

import static org.quartz.JobBuilder.newJob;
import static org.quartz.SimpleScheduleBuilder.simpleSchedule;

public class PrintScheduler {

   private Scheduler scheduler;
   public PrintScheduler(String instanceId) {
      try {
         scheduler = new StdSchedulerFactory().getScheduler();
         scheduler.start();
      } catch (Exception e) {
         e.printStackTrace();
      }
   }

   public void schedule() throws SchedulerException {
      JobDetail job = newJob(PrintStatefulJob.class).withIdentity("printjob", "printjobgroup").build();
      job.getJobDataMap().put("count",0);
      Trigger trigger = TriggerBuilder.newTrigger().withIdentity("printTrigger", "printtriggergroup")
            .startNow().withSchedule(simpleSchedule().withIntervalInMilliseconds(100l).repeatForever()).build();
      scheduler.scheduleJob(job, trigger);
   }

   public void stopScheduler() throws SchedulerException {
      scheduler.shutdown();
   }

   public static void main(String[] args) {
      PrintScheduler printScheduler = new PrintScheduler(args[0]);
      try {
//       printScheduler.schedule();
         Thread.sleep(60000l);
         printScheduler.stopScheduler();
      } catch (Exception e) {
         e.printStackTrace();
      }
   }

}

PrintStatefulJob.java –

package ramstore;

import org.quartz.*;

@PersistJobDataAfterExecution
public class PrintStatefulJob implements Job{

   @Override
   public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
      JobDataMap jobDataMap = jobExecutionContext.getJobDetail().getJobDataMap();
      Integer count = jobDataMap.getInt("count");
      count++;
      System.out.println("Printing count : "+count);
      jobDataMap.put("count",count);
   }

}

On successful fire of job, it can be observed that the job details and data blob details are updated in the tables of the database.
The job thus stored in the database can be rescheduled in case there is any need. This post deals with rescheduling of jobs from the JDBCJobStore

Code @ Github – https://github.com/vageeshhoskere/blog/tree/master/quartz

Data sharing between jobs – Stateful Jobs using Quartz Scheduler

Note: Updated the code to reflect quartz version 2.2.x

The data manipulated by different jobs can be persisted by using the interface StatefulJob. The sample program below shows how stateful jobs work in quartz scheduler.

The updated PrintStatefulJob.java-

package ramstore;

import org.quartz.*;

@PersistJobDataAfterExecution
public class PrintStatefulJob implements Job{

	@Override
	public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
		JobDataMap jobDataMap = jobExecutionContext.getJobDetail().getJobDataMap();
		Integer count = jobDataMap.getInt("count");
		count++;
		System.out.println("Printing count : "+count);
		jobDataMap.put("count",count);
	}

}

JobListener interface can be used to listen to job execution so as to have customized action on job execution. For example, the program below prints out the incremented count using the PrintJobListener class which implements the JobListener interface-

PrintRamScheduler.java-

package ramstore;

import org.quartz.*;
import org.quartz.impl.StdSchedulerFactory;
import org.quartz.impl.matchers.KeyMatcher;

import static org.quartz.JobBuilder.newJob;
import static org.quartz.SimpleScheduleBuilder.simpleSchedule;

public class PrintRamScheduler {

	private Scheduler scheduler;
	public PrintRamScheduler() {
		try {
			//create scheduler factory
			scheduler = new StdSchedulerFactory().getScheduler();
			scheduler.start();
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	public void schedule() throws SchedulerException {
		JobKey jobKey = JobKey.jobKey("printjob", "printjobgroup");
		//create a job
		JobDetail job = newJob(PrintStatefulJob.class).withIdentity(jobKey).build();
		//put a count variable that we can keep incrementing
		job.getJobDataMap().put("count",0);
		//create a trigger
		Trigger trigger = TriggerBuilder.newTrigger().withIdentity("printTrigger", "printtriggergroup")
				.startNow().withSchedule(simpleSchedule().withIntervalInMilliseconds(100l).repeatForever()).build();
		//schedule the job
		scheduler.scheduleJob(job, trigger);
		//add a listner only for specific job - It is also possible to add a generic listener for all jobs
		scheduler.getListenerManager().addJobListener(new PrintJobListener(), KeyMatcher.keyEquals(jobKey));
	}

	public void stopScheduler() throws SchedulerException {
		//scheduler shutdown
		scheduler.shutdown();
	}

	public static void main(String[] args) {
		PrintRamScheduler printScheduler = new PrintRamScheduler();
		try {
			printScheduler.schedule();
			Thread.sleep(10000l);
			printScheduler.stopScheduler();
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

}

PrintJobListener.java-

package ramstore;

import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.quartz.JobListener;

public class PrintJobListener implements JobListener {
	@Override
	public String getName() {
		return "PRINT_JOB_LISTENER";
	}

	@Override
	public void jobToBeExecuted(JobExecutionContext jobExecutionContext) {

	}

	@Override
	public void jobExecutionVetoed(JobExecutionContext jobExecutionContext) {
		//called when a job is vetoed via the TriggerListener#vetoJobExecution
		//details - http://www.quartz-scheduler.org/api/2.2.1/org/quartz/TriggerListener.html#vetoJobExecution(org.quartz.Trigger,%20org.quartz.JobExecutionContext)
	}

	@Override
	public void jobWasExecuted(JobExecutionContext jobExecutionContext, JobExecutionException e) {
		System.out.println("Job was executed for count "
				+String.valueOf(jobExecutionContext.getMergedJobDataMap().get("count")));
	}
}

Code @ Git – https://github.com/vageeshhoskere/blog