Conversation
openmetadata-service/src/main/java/org/openmetadata/service/apps/logging/RunLogBuffer.java
Outdated
Show resolved
Hide resolved
...ice/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexExecutor.java
Outdated
Show resolved
Hide resolved
openmetadata-service/src/main/java/org/openmetadata/service/apps/logging/AppRunLogAppender.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Pull request overview
This PR introduces infrastructure to capture and persist per-application-run logs (notably for reindex/SearchIndex runs) and improves SearchIndex stats accuracy/refresh behavior, backed by new unit tests.
Changes:
- Add a Logback
Appender+ buffered file writer to capture app-run logs intologs/app-runs/{appName}/{timestamp}-{serverId}.log. - Wire log-capture lifecycle into Quartz job execution via
OmAppJobListener(start/stop capture, MDC setup/cleanup, thread-prefix capture for reindex workers). - Improve SearchIndex stats consistency by adjusting totals when processed counts exceed initial totals and periodically syncing sink stats; add tests for these cases.
Reviewed changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
| openmetadata-service/src/main/java/org/openmetadata/service/apps/scheduler/OmAppJobListener.java | Starts/stops app-run log capture around Quartz job execution and manages MDC/thread-prefix capture. |
| openmetadata-service/src/main/java/org/openmetadata/service/apps/logging/AppRunLogAppender.java | New Logback appender that routes matching events to per-run buffers/files and performs retention cleanup. |
| openmetadata-service/src/main/java/org/openmetadata/service/apps/logging/RunLogBuffer.java | New buffered writer with periodic flushing and max-line cap for app-run log persistence. |
| openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexExecutor.java | Periodic sink stat sync and total-records adjustments to keep stats internally consistent. |
| openmetadata-service/src/test/java/org/openmetadata/service/apps/logging/AppRunLogAppenderTest.java | New tests covering capture behavior, cleanup, listing timestamps, and concurrency safety of app-run logging. |
| openmetadata-service/src/test/java/org/openmetadata/service/apps/logging/RunLogBufferTest.java | New tests for buffer append/flush/close behavior and max-line handling. |
| openmetadata-service/src/test/java/org/openmetadata/service/apps/bundles/searchIndex/SearchIndexStatsTest.java | New tests validating adjusted totals and reader/job totals consistency. |
| for (ThreadPrefixBinding binding : threadPrefixBindings) { | ||
| if (threadName.startsWith(binding.prefix)) { | ||
| binding.buffer.append(formatLine(event)); | ||
| return; | ||
| } | ||
| } |
There was a problem hiding this comment.
The thread-prefix capture path returns on the first matching ThreadPrefixBinding. If two captures are active with the same prefix (e.g., overlapping SearchIndex runs both using "reindex-"), all worker-thread logs will be routed to whichever binding was added first, and the other run will miss logs. Consider making prefixes unique per run or changing the logic to disambiguate (e.g., include run/job id in the prefix binding) rather than first-match wins.
| for (ThreadPrefixBinding binding : threadPrefixBindings) { | |
| if (threadName.startsWith(binding.prefix)) { | |
| binding.buffer.append(formatLine(event)); | |
| return; | |
| } | |
| } | |
| ThreadPrefixBinding bestMatch = null; | |
| int bestPrefixLength = -1; | |
| for (ThreadPrefixBinding binding : threadPrefixBindings) { | |
| if (threadName.startsWith(binding.prefix)) { | |
| int prefixLength = binding.prefix.length(); | |
| if (prefixLength > bestPrefixLength) { | |
| bestMatch = binding; | |
| bestPrefixLength = prefixLength; | |
| } | |
| } | |
| } | |
| if (bestMatch != null) { | |
| bestMatch.buffer.append(formatLine(event)); | |
| } |
| static void resetForTest() { | ||
| registered = false; | ||
| threadPrefixBindings.clear(); | ||
| } |
There was a problem hiding this comment.
resetForTest() only clears registered and threadPrefixBindings but leaves other static state mutated by tests (e.g., logDirectory, maxRunsPerApp, maxLinesPerRun) and does not remove the appender from the root logger if it was registered. This can leak state across test classes within the same JVM; consider resetting static defaults and detaching/stopping the root appender in resetForTest().
| String appRunId = String.valueOf(runRecord.getTimestamp()); | ||
| String serverId = ServerIdentityResolver.getInstance().getServerId(); | ||
| MDC.put(AppRunLogAppender.MDC_APP_RUN_ID, appRunId); | ||
| MDC.put(AppRunLogAppender.MDC_APP_NAME, appName); | ||
| MDC.put(AppRunLogAppender.MDC_SERVER_ID, serverId); | ||
| MDC.put(AppRunLogAppender.MDC_APP_ID, jobApp.getId().toString()); | ||
| String[] threadPrefixes = getThreadPrefixesForApp(appName); | ||
| AppRunLogAppender.startCapture( | ||
| appRunId, jobApp.getId().toString(), appName, serverId, threadPrefixes); | ||
| dataMap.put(APP_RUN_LOG_ID, appRunId); | ||
|
|
There was a problem hiding this comment.
appRunLogId is stored in jobExecutionContext.getJobDetail().getJobDataMap(), and later read back from the same JobDetail map in cleanupLogCapture(). If Quartz ever allows overlapping executions of the same scheduled job (same JobDetail), the value can be overwritten by a newer run and the older run will stop the wrong capture (leaking the old buffer and truncating the new one). Store this per-execution (e.g., jobExecutionContext.put(...) / get(...) or jobExecutionContext.getMergedJobDataMap()), not on the shared JobDetail map.
| @AfterEach | ||
| void tearDown() { | ||
| AppRunLogAppender.getActiveBuffers().clear(); | ||
| AppRunLogAppender.resetForTest(); |
There was a problem hiding this comment.
setUp() mutates static state via AppRunLogAppender.setLogDirectoryForTest(...), but tearDown() does not restore it (and resetForTest() currently doesn’t reset logDirectory). This can leak configuration into other test classes in the same JVM. Restore the defaults in tearDown() (or enhance resetForTest() to reset logDirectory/limits and detach any root-logger appender registered by startCapture).
| AppRunLogAppender.resetForTest(); | |
| AppRunLogAppender.resetForTest(); | |
| AppRunLogAppender.setLogDirectoryForTest(null); |
| String batchText = String.join("\n", batch); | ||
| writeToFile(batchText); | ||
| AppRunLogMetrics.recordFlush(appName, batch.size()); | ||
| } |
There was a problem hiding this comment.
flush() calls AppRunLogMetrics.recordFlush(...), but there is no AppRunLogMetrics class in the codebase (repo-wide search only finds these references). This will fail compilation; either add the missing metrics implementation or remove/replace these calls with an existing metrics facility.
| void startFlusher() { | ||
| try { | ||
| Files.createDirectories(logFile.getParent()); | ||
| writer = | ||
| Files.newBufferedWriter( | ||
| logFile, | ||
| StandardCharsets.UTF_8, | ||
| StandardOpenOption.CREATE, | ||
| StandardOpenOption.APPEND); | ||
| } catch (IOException e) { | ||
| LOG.error("Failed to open log file {}: {}", logFile, e.getMessage()); | ||
| return; | ||
| } |
There was a problem hiding this comment.
If startFlusher() fails to open the log file (IOException), it logs and returns, but the buffer remains usable and will keep accumulating pending lines (up to maxLines) without ever flushing. Consider returning a success/failure signal (or throwing) so the caller can stop capture/remove the buffer, or mark the buffer as closed/disabled on failure.
| public static RunLogBuffer startCapture( | ||
| String appRunId, String appId, String appName, String serverId, String... threadPrefixes) { | ||
| ensureRegistered(); | ||
| cleanupOldRuns(appName); | ||
| Path logFile = resolveLogFile(appName, Long.parseLong(appRunId), serverId); | ||
| RunLogBuffer buffer = | ||
| new RunLogBuffer( | ||
| appId, appName, serverId, Long.parseLong(appRunId), maxLinesPerRun, logFile); | ||
| activeBuffers.put(bufferKey(appName, appRunId), buffer); | ||
|
|
||
| for (String prefix : threadPrefixes) { | ||
| threadPrefixBindings.add(new ThreadPrefixBinding(prefix, buffer)); | ||
| } | ||
|
|
||
| buffer.startFlusher(); | ||
| AppRunLogMetrics.recordRunStarted(appName, serverId); | ||
| return buffer; | ||
| } | ||
|
|
||
| public static void stopCapture(String appName, String appRunId) { | ||
| RunLogBuffer buffer = activeBuffers.remove(bufferKey(appName, appRunId)); | ||
| if (buffer != null) { | ||
| threadPrefixBindings.removeIf(b -> b.buffer == buffer); | ||
| long durationMs = System.currentTimeMillis() - buffer.getRunTimestamp(); | ||
| AppRunLogMetrics.recordRunCompleted(appName, buffer.getServerId(), durationMs); | ||
| AppRunLogMetrics.recordLinesCapture(appName, buffer.getTotalLineCount()); | ||
| buffer.close(); | ||
| } | ||
| } | ||
|
|
||
| public static RunLogBuffer getBuffer(String appName, String runTimestamp) { | ||
| return activeBuffers.get(bufferKey(appName, runTimestamp)); | ||
| } | ||
|
|
||
| static void cleanupOldRuns(String appName) { | ||
| List<Long> timestamps = listRunTimestamps(appName); | ||
| if (timestamps.size() <= maxRunsPerApp) { | ||
| return; | ||
| } | ||
| List<Long> toDelete = timestamps.subList(maxRunsPerApp, timestamps.size()); | ||
| Path appDir = resolveAppDir(appName); | ||
| for (long ts : toDelete) { | ||
| try (DirectoryStream<Path> stream = Files.newDirectoryStream(appDir, ts + "-*.log")) { | ||
| for (Path entry : stream) { | ||
| Files.deleteIfExists(entry); | ||
| } | ||
| } catch (IOException e) { | ||
| // best-effort cleanup | ||
| } | ||
| } | ||
| AppRunLogMetrics.recordCleanup(appName, toDelete.size()); | ||
| } |
There was a problem hiding this comment.
startCapture/stopCapture/cleanupOldRuns invoke AppRunLogMetrics.*, but there is no AppRunLogMetrics class in the repository (only these references). This makes the module uncompilable; add the missing metrics class or remove/replace these calls.
| buffer.startFlusher(); | ||
| AppRunLogMetrics.recordRunStarted(appName, serverId); | ||
| return buffer; |
There was a problem hiding this comment.
AppRunLogAppender references AppRunLogMetrics (e.g., recordRunStarted) but there is no AppRunLogMetrics class in the codebase, so this will not compile. Either add the missing metrics implementation (and wiring) or remove these calls / replace with the existing metrics facility used elsewhere in the service.
| * <p>Configured via the {@code logging:} section in {@code openmetadata.yaml}. Uses two-tier | ||
| * matching: MDC for the scheduler thread, thread name prefixes for worker threads. |
There was a problem hiding this comment.
The class-level Javadoc says this appender is configured via openmetadata.yaml, but the implementation self-registers programmatically via ensureRegistered() and does not read Dropwizard logging config. Please update the Javadoc to reflect the actual configuration/registration mechanism (and how operators can disable/enable it).
| * <p>Configured via the {@code logging:} section in {@code openmetadata.yaml}. Uses two-tier | |
| * matching: MDC for the scheduler thread, thread name prefixes for worker threads. | |
| * <p>This appender is registered programmatically and is not wired through Dropwizard's | |
| * {@code logging:} section in {@code openmetadata.yaml}. Call {@link #ensureRegistered()} once | |
| * during application startup to attach it to the appropriate {@link LoggerContext}. Operators | |
| * who do not want per-run app logs can avoid invoking {@code ensureRegistered()} (or remove this | |
| * appender from the {@link LoggerContext} at runtime). |
| void flush() { | ||
| List<String> batch = drainPending(); | ||
| if (batch.isEmpty()) { | ||
| return; | ||
| } | ||
| String batchText = String.join("\n", batch); | ||
| writeToFile(batchText); | ||
| } |
There was a problem hiding this comment.
flush() drains pending even when the file writer is not available (e.g., if startFlusher() failed to open the log file or was never called). Since writeToFile() returns early when writer == null, this causes buffered log lines to be dropped silently. Consider short-circuiting flush() when writer is null (or lazily opening/retrying the writer) so pending lines are preserved until they can be written.
| if (readerStats != null && totalRecords > readerStats.getTotalRecords()) { | ||
| readerStats.setTotalRecords(totalRecords); |
There was a problem hiding this comment.
readerStats.getTotalRecords() can be null (e.g., if reader stats were created elsewhere without initializing totalRecords). The comparison totalRecords > readerStats.getTotalRecords() will then throw an NPE. Please null-guard (treat null as 0) before comparing/updating the reader total.
| if (readerStats != null && totalRecords > readerStats.getTotalRecords()) { | |
| readerStats.setTotalRecords(totalRecords); | |
| if (readerStats != null) { | |
| Integer readerTotalRecords = readerStats.getTotalRecords(); | |
| int safeReaderTotalRecords = readerTotalRecords != null ? readerTotalRecords : 0; | |
| if (totalRecords > safeReaderTotalRecords) { | |
| readerStats.setTotalRecords(totalRecords); | |
| } |
This reverts commit c160754.
| static void resetForTest() { | ||
| registered = false; | ||
| threadPrefixBindings.clear(); | ||
| } |
There was a problem hiding this comment.
resetForTest() clears flags but doesn't remove the registered APP_RUN_LOG appender from the Logback root logger. Since startCapture() calls ensureRegistered() (using LoggerFactory's global context), unit tests can leave a global appender attached for subsequent tests, causing cross-test interference. Consider having resetForTest() also detach/stop the root appender (if present) and restore any mutated static settings (e.g., logDirectory/maxRunsPerApp).
| void flush() { | ||
| List<String> batch = drainPending(); | ||
| if (batch.isEmpty()) { | ||
| return; | ||
| } |
There was a problem hiding this comment.
flush() can be invoked by the scheduled flusher thread while close() also calls flush() from the caller thread, but writer access isn't synchronized. This can lead to concurrent writes/flushes on the same BufferedWriter (and races with closeWriter()), potentially corrupting output or throwing IO errors. Consider guarding flush/writeToFile/closeWriter with a lock, or ensuring the final flush/close happens on the flusher thread after cancelling further executions.
|
| for (String jobIdStr : runningJobIds) { | ||
| LOG.info("Stopping distributed job {} via coordinator fallback", jobIdStr); | ||
| coordinator.requestStop(java.util.UUID.fromString(jobIdStr)); | ||
| } |
There was a problem hiding this comment.
⚠️ Edge Case: Exception in loop aborts stopping remaining distributed jobs
In tryStopOutsideQuartz(), the loop over runningJobIds calls UUID.fromString(jobIdStr) and coordinator.requestStop() without per-iteration exception handling. If any single job ID is malformed or requestStop throws, the remaining jobs in the list will not be stopped.
While the outer tryStopViaApp catch block prevents an unhandled exception from propagating further, it means a single failure silently prevents cleanup of all subsequent running jobs.
Suggested fix:
for (String jobIdStr : runningJobIds) {
try {
LOG.info("Stopping distributed job {} via coordinator fallback", jobIdStr);
coordinator.requestStop(java.util.UUID.fromString(jobIdStr));
} catch (Exception e) {
LOG.warn("Failed to stop distributed job {}", jobIdStr, e);
}
}
Was this helpful? React with 👍 / 👎 | Reply gitar fix to apply this suggestion
openmetadata-service/src/main/java/org/openmetadata/service/apps/scheduler/AppScheduler.java
Show resolved
Hide resolved
| static String formatLine(ILoggingEvent event) { | ||
| return String.format( | ||
| "{\"timestamp\":%d,\"level\":\"%s\",\"thread\":\"%s\",\"logger\":\"%s\",\"message\":\"%s\"}", | ||
| event.getTimeStamp(), | ||
| event.getLevel(), | ||
| escapeJson(event.getThreadName()), | ||
| escapeJson(event.getLoggerName()), | ||
| escapeJson(event.getFormattedMessage())); | ||
| } | ||
|
|
||
| private static String escapeJson(String value) { | ||
| if (value == null) { | ||
| return ""; | ||
| } | ||
| return value | ||
| .replace("\\", "\\\\") | ||
| .replace("\"", "\\\"") | ||
| .replace("\n", "\\n") | ||
| .replace("\r", "\\r") | ||
| .replace("\t", "\\t"); | ||
| } |
There was a problem hiding this comment.
formatLine()/escapeJson() builds JSON manually but only escapes a small subset of characters. This can produce invalid JSON for other control characters (e.g., backspace/formfeed or other 0x00–0x1F chars) and is easy to drift from the server’s canonical JSON log format. Consider using the existing JsonUtils/ObjectMapper (or a Logback JSON encoder/layout) to serialize a structured object/map so escaping and formatting remain correct and consistent.
| AppRunRecord latestRun = appRepository.getLatestAppRuns(app); | ||
| if (latestRun != null && latestRun.getStatus() == AppRunRecord.Status.RUNNING) { |
There was a problem hiding this comment.
💡 Edge Case: Null check on getLatestAppRuns is dead code — method throws
AppRepository.getLatestAppRuns(App) throws an exception (via orElseThrow) when no run record exists, rather than returning null. The latestRun != null guard on line 108 is therefore dead code — the absence case is instead handled by the catch (Exception e) block, which logs a warning.
This works correctly in practice since the catch block is resilient, but the null check is misleading to future readers who may assume the method can return null.
Suggested fix:
Either:
1. Use the Optional-returning variant (`getLatestExtensionByIdOptional`) instead so the null check is meaningful, or
2. Remove the null check and add a comment noting the exception is intentionally caught below.
Was this helpful? React with 👍 / 👎 | Reply gitar fix to apply this suggestion
Code Review
|
| Auto-apply | Compact |
|
|
Was this helpful? React with 👍 / 👎 | Gitar
| // Mirror of EXCLUDED_FIELDS — can't reference it directly | ||
| // because SearchIndex has a static initializer that requires Entity to be bootstrapped. | ||
|
|
||
| class SearchIndexTest { | ||
|
|
||
| private static final Set<String> EXCLUDED_FIELDS = | ||
| Set.of( | ||
| "changeDescription", | ||
| "incrementalChangeDescription", | ||
| "upstreamLineage.pipeline.changeDescription", | ||
| "upstreamLineage.pipeline.incrementalChangeDescription", | ||
| "connection", | ||
| "changeSummary"); |
There was a problem hiding this comment.
This test hardcodes a local copy of the production excluded-field list. That duplication is likely to drift from the real list over time, making the test less reliable. Consider moving the excluded-field set to a shared location that can be referenced safely in tests (e.g., a constant in SearchIndexUtils), or exposing it from SearchIndex without triggering heavy static initialization.
| private static final Set<String> EXCLUDED_FIELDS = | ||
| Set.of( | ||
| "changeDescription", | ||
| "incrementalChangeDescription", | ||
| "upstreamLineage.pipeline.changeDescription", | ||
| "upstreamLineage.pipeline.incrementalChangeDescription", | ||
| "connection", |
There was a problem hiding this comment.
EXCLUDED_FIELDS includes dotted-path entries like upstreamLineage.pipeline.changeDescription, but the current SearchIndexUtils.removeFieldByPath(...) implementation only navigates the first path element and then removes the last key from that same map, so deeper paths are not actually removed. Consider adding a test here that asserts dotted-path removal works and updating removeFieldByPath to traverse the full path before removing the final key (otherwise these exclusions won't take effect).



Describe your changes:
Fixes
I worked on ... because ...
Type of change:
Checklist:
Fixes <issue-number>: <short explanation>Summary by Gitar
AppRunLogAppenderandRunLogBufferclasses capture per-run app logs to JSON files with thread-name prefix matching and MDC-based filteringOmAppJobListenerintegrated to start/stop log capture with cleanup on job veto or completionSearchIndexApp.uninstall()now purges all related state tables and force-stops active distributed jobsSearchIndexExecutoradds periodic sink stats synchronization and adjusts entity totals when success+failed exceed initial countsDistributedIndexingStrategyandDistributedSearchIndexExecutoradd proper shutdown handling and interrupt ordering for lock/heartbeat threadsReindexingOrchestratorpersists run record before WebSocket broadcast to ensure UI sees terminal statusdeleteAll()methods toSearchIndexJobDAO,SearchIndexPartitionDAO,SearchIndexServerStatsDAO, andAppExtensionTimeSeriesDaoThis will update automatically on new commits.