-
Notifications
You must be signed in to change notification settings - Fork 42
hotfix: fix the bug of triggerProcessLoop #50
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
hotfix: fix the bug of triggerProcessLoop #50
Conversation
…sLoop, delete build dir when compile
… log4j in app; add api stop for IPluginEventListener to avoid losing data in queue
plugins/mongodbplugin/src/main/java/org/tron/eventplugin/MongodbSenderImpl.java
Show resolved
Hide resolved
| continue; | ||
| } | ||
| //check if it's json | ||
| JSONObject jsonObject = JSONObject.parseObject(triggerData); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is it necessary to verify each time if the data is in JSON format? Could this have an impact on performance?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to extract the triggerName from the JSON with minimal CPU overhead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using String.contains is logically incorrect as other key may cotains this triggerName also. We should match with it exactly.
plugins/kafkaplugin/src/main/java/org/tron/eventplugin/KafkaSenderImpl.java
Show resolved
Hide resolved
| this.name = name; | ||
| } | ||
|
|
||
| public static EventTopic getEventTopicByType(int value) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what about using map instead of for loop?
such as:
private static final Map<Integer, EventTopic> TYPE_MAP = Arrays.stream(values())
.collect(Collectors.toMap(EventTopic::getType, Function.identity()));
private static final Map<String, EventTopic> NAME_MAP = Arrays.stream(values())
.collect(Collectors.toMap(EventTopic::getName, Function.identity()));
public static EventTopic getEventTopicByType(int value) {
return TYPE_MAP.get(value); // O(1)
}
public static EventTopic getEventTopicByName(String topicName) {
return NAME_MAP.get(topicName); // O(1)
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, it uses setTopic only once when start the plugin, so there is no need to optimize it.
plugins/kafkaplugin/src/main/java/org/tron/eventplugin/KafkaSenderImpl.java
Outdated
Show resolved
Hide resolved
plugins/mongodbplugin/src/main/java/org/tron/eventplugin/MongodbSenderImpl.java
Outdated
Show resolved
Hide resolved
| }; | ||
|
|
||
| @Override | ||
| public void close() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we interrupt the triggerProcessThread which is started in the init?
such as:
isRunTriggerProcessThread = false;
if (triggerProcessThread != null) {
triggerProcessThread.interrupt();
try {
triggerProcessThread.join(2000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's alternative way to break the thread. Accept. TriggerProcessThread is Used for transferring data between two queues, no IO operaton with extremely high performance.
| }; | ||
|
|
||
| @Override | ||
| public void close() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should set isRunTriggerProcessThread = false; firstly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's alternative way to break the thread. Accept.
| @Override | ||
| public void close() { | ||
| log.info("Closing MongodbSender..."); | ||
| if (triggerProcessThread != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add try-cath?
if (triggerProcessThread != null) {
triggerProcessThread.interrupt();
try {
triggerProcessThread.join(5000);
} catch (InterruptedException e) {
log.warn("Interrupted while waiting for triggerProcessThread to stop");
Thread.currentThread().interrupt();
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good suggestion. Tranfering data between two queues is very quick, we use 1000 ms at most.
| service.shutdownNow(); // Cancel currently executing tasks | ||
| // Wait a while for tasks to respond to being cancelled | ||
| if (!service.awaitTermination(60, java.util.concurrent.TimeUnit.SECONDS)) { | ||
| log.warn("Mongo triggerProcessThread did not terminate"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
update log to "Mongo service thread pool did not terminate"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good suggestion. Fix it.
plugins/mongodbplugin/src/main/java/org/tron/eventplugin/MongodbSenderImpl.java
Show resolved
Hide resolved
…nection; wait threads stop for millseconds
| Thread.currentThread().interrupt(); | ||
| } | ||
| if (mongoManager != null) { | ||
| mongoManager.close(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the triggerQueue contains data, should we wait until the data is written to the database before shutting down the service? Otherwise, the data in the triggerQueue will be lost.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Before close mongo connection, we wait for the tasks to complete using this :
// Wait a while for existing tasks to terminate
if (!service.awaitTermination(60, java.util.concurrent.TimeUnit.SECONDS)) {
service.shutdownNow(); // Cancel currently executing tasks
// Wait a while for tasks to respond to being cancelled
if (!service.awaitTermination(60, java.util.concurrent.TimeUnit.SECONDS)) {
log.warn("Mongo service thread pool did not terminate");
}
}
The task is writing data through mongo collection.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
triggerProcessThread transfers data from triggerQueue to another queue. It's extremely quick. And 1000 millseconds is enough long:
if (triggerProcessThread != null) {
triggerProcessThread.interrupt();
try {
triggerProcessThread.join(1000);
} catch (InterruptedException e) {
log.warn("Interrupted while waiting for triggerProcessThread to stop");
Thread.currentThread().interrupt();
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Broadcast status may be relatively fast, but synchronizing historical data can take a longer time. Additionally, network jitter can also cause significant network latency.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can restart the FullNode with updated startSyncBlockNum by query mongo. Even though write all the data timely, you also need check it again.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We use option WriteConcern.JOURNALED to ensure flushing disk before closing mongo connection so that alleviate this problem.
plugins/mongodbplugin/src/main/java/org/tron/eventplugin/MongodbSenderImpl.java
Outdated
Show resolved
Hide resolved
waynercheung
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
What does this PR do?
triggerProcessLoopthread if content has the same topic name; check if the data is json before send to server;triggerProcessLoopfailed to exit when interrupt.latestSolidifiedBlockNumberforBlockLogTriggerrefered in fix: add missing latestSolidifiedBlockNumber field to BlockLogTrigger #46, delete 3 reduant colunm forBlockLogTriggerEventTopicinstead ofConstantcleanin gradleMessageSenderImpltoKafkaSenderImplfor consistencyWhy are these changes required?
This PR has been tested by:
Follow up
Extra details