Skip to content

Commit 5b64ea9

Browse files
committed
Started to implement a job system
1 parent 6f587d6 commit 5b64ea9

File tree

9 files changed

+323
-4
lines changed

9 files changed

+323
-4
lines changed

src/main/java/org/javawebstack/framework/WebApplication.java

Lines changed: 48 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import org.javawebstack.framework.bind.ModelBindTransformer;
88
import org.javawebstack.framework.command.*;
99
import org.javawebstack.framework.config.Config;
10+
import org.javawebstack.framework.job.*;
1011
import org.javawebstack.framework.module.Module;
1112
import org.javawebstack.framework.seed.FileSeeder;
1213
import org.javawebstack.framework.seed.MergedSeeder;
@@ -18,15 +19,14 @@
1819
import org.javawebstack.httpserver.transformer.response.JsonResponseTransformer;
1920
import org.javawebstack.injector.Injector;
2021
import org.javawebstack.injector.SimpleInjector;
22+
import org.javawebstack.orm.ORM;
23+
import org.javawebstack.orm.ORMConfig;
2124
import org.javawebstack.orm.exception.ORMConfigurationException;
2225
import org.javawebstack.orm.wrapper.MySQL;
2326
import org.javawebstack.orm.wrapper.SQL;
2427
import org.javawebstack.orm.wrapper.SQLite;
2528

26-
import java.util.ArrayList;
27-
import java.util.HashMap;
28-
import java.util.List;
29-
import java.util.Map;
29+
import java.util.*;
3030
import java.util.logging.Logger;
3131

3232
public abstract class WebApplication {
@@ -109,6 +109,7 @@ public WebApplication(){
109109
setupCommands(commandSystem);
110110
modules.forEach(m -> m.setupCommands(this, commandSystem));
111111
commandSystem.addCommand("start", new StartCommand());
112+
commandSystem.addCommand("worker", new WorkerCommand());
112113
commandSystem.addCommand("sh", new ShellCommand());
113114
commandSystem.addCommand("db", new MultiCommand()
114115
.add("migrate", new DBMigrateCommand())
@@ -125,6 +126,28 @@ public WebApplication(){
125126
);
126127
}
127128

129+
public void addDatabaseJobQueue(String name, boolean defaultQueue){
130+
addQueue(name, new DatabaseJobQueue(name), defaultQueue);
131+
}
132+
133+
public void enableDatabaseJobs(ORMConfig config) throws ORMConfigurationException {
134+
ORM.register(DatabaseQueuedJob.class, sql, config);
135+
}
136+
137+
public void addSyncJobQueue(String name, int capacity, boolean defaultQueue){
138+
addQueue(name, new SyncThreadedJobQueue(capacity), defaultQueue);
139+
}
140+
141+
public void addImmediateJobQueue(String name, boolean defaultQueue){
142+
addQueue(name, new ImmidiateJobQueue(), defaultQueue);
143+
}
144+
145+
public void addQueue(String name, JobQueue queue, boolean defaultQueue){
146+
injector.setInstance(JobQueue.class, name, queue);
147+
if(defaultQueue)
148+
injector.setInstance(JobQueue.class, "", queue);
149+
}
150+
128151
public WebApplication addModule(Module module){
129152
modules.add(module);
130153
return this;
@@ -202,4 +225,25 @@ public void start(){
202225
server.join();
203226
}
204227

228+
public void startWorker(String... queues){
229+
List<WorkerJobQueue> workerQueues = new ArrayList<>();
230+
for(String name : queues){
231+
JobQueue queue = injector.getInstance(JobQueue.class, name);
232+
if(queue == null)
233+
continue;
234+
if(queue instanceof WorkerJobQueue)
235+
workerQueues.add((WorkerJobQueue) queue);
236+
}
237+
if(workerQueues.size() == 0){
238+
logger.severe("No queue to process!");
239+
return;
240+
}
241+
UUID processUUID = UUID.randomUUID();
242+
logger.info("Running worker ("+processUUID.toString()+")");
243+
while (true){
244+
for(WorkerJobQueue queue : workerQueues)
245+
queue.process(processUUID);
246+
}
247+
}
248+
205249
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package org.javawebstack.framework.command;
2+
3+
import org.javawebstack.command.Command;
4+
import org.javawebstack.command.CommandResult;
5+
import org.javawebstack.command.CommandSystem;
6+
import org.javawebstack.framework.WebApplication;
7+
import org.javawebstack.injector.Inject;
8+
9+
import java.util.List;
10+
import java.util.Map;
11+
12+
public class WorkerCommand implements Command {
13+
@Inject
14+
private WebApplication application;
15+
public CommandResult execute(CommandSystem commandSystem, List<String> args, Map<String, List<String>> params) {
16+
if(args.size() == 0)
17+
return CommandResult.syntax("worker <...queues>");
18+
application.startWorker(args.toArray(new String[0]));
19+
return CommandResult.success();
20+
}
21+
}
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
package org.javawebstack.framework.job;
2+
3+
import org.javawebstack.graph.GraphElement;
4+
import org.javawebstack.graph.GraphMapper;
5+
import org.javawebstack.graph.NamingPolicy;
6+
import org.javawebstack.orm.Repo;
7+
8+
import java.util.HashMap;
9+
import java.util.Map;
10+
import java.util.UUID;
11+
12+
public class DatabaseJobQueue implements WorkerJobQueue {
13+
14+
private static final GraphMapper mapper = new GraphMapper().setNamingPolicy(NamingPolicy.SNAKE_CASE);
15+
private final String name;
16+
private final Map<Class<? extends Job>, String> typeMap = new HashMap<>();
17+
18+
public DatabaseJobQueue(String name){
19+
super();
20+
this.name = name;
21+
}
22+
23+
public String getName() {
24+
return name;
25+
}
26+
27+
public void queue(Job job){
28+
DatabaseQueuedJob queued = new DatabaseQueuedJob();
29+
queued.setType(getType(job));
30+
queued.setData(mapper.toGraph(job).toJsonString());
31+
queued.setMaxRetries(job.maxRetries());
32+
queued.setQueue(name);
33+
queued.save();
34+
}
35+
36+
public DatabaseQueuedJob pull(UUID processUUID){
37+
DatabaseQueuedJob job = Repo.get(DatabaseQueuedJob.class).where("queue", name).isNull("processUUID").get();
38+
if(job != null) {
39+
job.setProcessUUID(processUUID);
40+
job.save();
41+
job.refresh();
42+
if (processUUID.equals(job.getProcessUUID()))
43+
return job;
44+
}
45+
return null;
46+
}
47+
48+
public void process(UUID processUUID){
49+
DatabaseQueuedJob queuedJob = pull(processUUID);
50+
if(queuedJob != null){
51+
Job job = mapper.fromGraph(GraphElement.fromJson(queuedJob.getData()), getType(queuedJob.getType()));
52+
boolean success;
53+
try {
54+
success = job.run();
55+
}catch (Throwable t){
56+
t.printStackTrace();
57+
success = false;
58+
try {
59+
Thread.sleep(500);
60+
} catch (InterruptedException ignored) {}
61+
}
62+
if(success || queuedJob.getRetries() >= queuedJob.getMaxRetries()){
63+
queuedJob.delete();
64+
}else{
65+
queuedJob.setRetries(queuedJob.getRetries()+1);
66+
queuedJob.setProcessUUID(null);
67+
queuedJob.save();
68+
}
69+
}else{
70+
try {
71+
Thread.sleep(500);
72+
} catch (InterruptedException ignored) {}
73+
}
74+
}
75+
76+
public void register(String name, Class<? extends Job> type){
77+
typeMap.put(type, name);
78+
}
79+
80+
public void register(Class<? extends Job> type){
81+
register(type.getSimpleName(), type);
82+
}
83+
84+
public String getType(Class<? extends Job> type){
85+
return typeMap.get(type);
86+
}
87+
88+
public String getType(Job job){
89+
return getType(job.getClass());
90+
}
91+
92+
public Class<? extends Job> getType(String name){
93+
for(Class<? extends Job> type : typeMap.keySet()){
94+
if(typeMap.get(type).equals(name))
95+
return type;
96+
}
97+
return null;
98+
}
99+
100+
}
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
package org.javawebstack.framework.job;
2+
3+
import org.javawebstack.orm.Model;
4+
import org.javawebstack.orm.annotation.Column;
5+
import org.javawebstack.orm.annotation.Dates;
6+
import org.javawebstack.orm.annotation.Table;
7+
8+
import java.sql.Timestamp;
9+
import java.util.UUID;
10+
11+
@Dates @Table("jobs")
12+
public class DatabaseQueuedJob extends Model {
13+
14+
@Column
15+
private int id;
16+
@Column
17+
private String queue;
18+
@Column
19+
private UUID processUUID;
20+
@Column
21+
private String type;
22+
@Column
23+
private String data;
24+
@Column
25+
private Integer retries = 0;
26+
@Column
27+
private Integer maxRetries = 0;
28+
@Column
29+
private Timestamp createdAt;
30+
@Column
31+
private Timestamp updatedAt;
32+
33+
public int getId() {
34+
return id;
35+
}
36+
37+
public String getQueue(){
38+
return queue;
39+
}
40+
41+
public Timestamp getCreatedAt() {
42+
return createdAt;
43+
}
44+
45+
public Timestamp getUpdatedAt() {
46+
return updatedAt;
47+
}
48+
49+
public Integer getRetries() {
50+
return retries;
51+
}
52+
53+
public Integer getMaxRetries() {
54+
return maxRetries;
55+
}
56+
57+
public UUID getProcessUUID() {
58+
return processUUID;
59+
}
60+
61+
public String getType() {
62+
return type;
63+
}
64+
65+
public void setType(String type) {
66+
this.type = type;
67+
}
68+
69+
public String getData() {
70+
return data;
71+
}
72+
73+
public void setData(String data) {
74+
this.data = data;
75+
}
76+
77+
public void setMaxRetries(Integer maxRetries) {
78+
this.maxRetries = maxRetries;
79+
}
80+
81+
public void setProcessUUID(UUID processUUID) {
82+
this.processUUID = processUUID;
83+
}
84+
85+
public void setQueue(String queue) {
86+
this.queue = queue;
87+
}
88+
89+
public void setRetries(Integer retries) {
90+
this.retries = retries;
91+
}
92+
93+
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package org.javawebstack.framework.job;
2+
3+
public class ImmidiateJobQueue implements JobQueue {
4+
public void run() {
5+
6+
}
7+
public void queue(Job job) {
8+
job.run();
9+
}
10+
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package org.javawebstack.framework.job;
2+
3+
public interface Job {
4+
5+
boolean run();
6+
int maxRetries();
7+
8+
default void start(){
9+
new Thread(this::run).start();
10+
}
11+
12+
}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package org.javawebstack.framework.job;
2+
3+
public interface JobQueue {
4+
5+
void queue(Job job);
6+
7+
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package org.javawebstack.framework.job;
2+
3+
import java.util.Queue;
4+
import java.util.concurrent.ArrayBlockingQueue;
5+
6+
public class SyncThreadedJobQueue implements JobQueue {
7+
private final Queue<Job> queue;
8+
public SyncThreadedJobQueue(int limit){
9+
queue = new ArrayBlockingQueue<>(limit);
10+
}
11+
public void queue(Job job) {
12+
queue.add(job);
13+
}
14+
public void run() {
15+
while (true){
16+
Job job = queue.poll();
17+
job.run();
18+
}
19+
}
20+
public void start(){
21+
new Thread(this).start();
22+
}
23+
}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package org.javawebstack.framework.job;
2+
3+
import java.util.UUID;
4+
5+
public interface WorkerJobQueue extends JobQueue {
6+
7+
void process(UUID processUUID);
8+
9+
}

0 commit comments

Comments
 (0)