Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ public class NiFiProperties extends ApplicationProperties {
public static final String REPOSITORY_CONTENT_PREFIX = "nifi.content.repository.directory.";
public static final String CONTENT_REPOSITORY_IMPLEMENTATION = "nifi.content.repository.implementation";
public static final String MAX_APPENDABLE_CLAIM_SIZE = "nifi.content.claim.max.appendable.size";
public static final String CONTENT_CLAIM_TRUNCATION_ENABLED = "nifi.content.claim.truncation.enabled";
public static final String CONTENT_ARCHIVE_MAX_RETENTION_PERIOD = "nifi.content.repository.archive.max.retention.period";
public static final String CONTENT_ARCHIVE_MAX_USAGE_PERCENTAGE = "nifi.content.repository.archive.max.usage.percentage";
public static final String CONTENT_ARCHIVE_BACK_PRESSURE_PERCENTAGE = "nifi.content.repository.archive.backpressure.percentage";
Expand Down Expand Up @@ -349,6 +350,7 @@ public class NiFiProperties extends ApplicationProperties {
public static final String DEFAULT_NAR_LIBRARY_AUTOLOAD_DIR = "./extensions";
public static final String DEFAULT_FLOWFILE_CHECKPOINT_INTERVAL = "20 secs";
public static final String DEFAULT_MAX_APPENDABLE_CLAIM_SIZE = "50 KB";
public static final String DEFAULT_CONTENT_CLAIM_TRUNCATION_ENABLED = "true";
public static final int DEFAULT_QUEUE_SWAP_THRESHOLD = 20000;
public static final long DEFAULT_BACKPRESSURE_COUNT = 10_000L;
public static final String DEFAULT_BACKPRESSURE_SIZE = "1 GB";
Expand Down Expand Up @@ -1418,6 +1420,10 @@ public String getMaxAppendableClaimSize() {
return getProperty(MAX_APPENDABLE_CLAIM_SIZE, DEFAULT_MAX_APPENDABLE_CLAIM_SIZE);
}

public boolean isContentClaimTruncationEnabled() {
return Boolean.parseBoolean(getProperty(CONTENT_CLAIM_TRUNCATION_ENABLED, DEFAULT_CONTENT_CLAIM_TRUNCATION_ENABLED));
}

@Override
public String getProperty(final String key, final String defaultValue) {
final String value = getProperty(key);
Expand Down
1 change: 1 addition & 0 deletions nifi-docs/src/main/asciidoc/administration-guide.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -3024,6 +3024,7 @@ For example, if `nifi.content.repository.archive.max.usage.percentage` is `50%`
|`nifi.content.repository.archive.enabled`|To enable content archiving, set this to `true` and specify a value for the `nifi.content.repository.archive.max.usage.percentage` property above. Content archiving enables the provenance UI to view or replay content that is no longer in a dataflow queue. By default, archiving is enabled.
|`nifi.content.repository.always.sync`|If set to `true`, any change to the repository will be synchronized to the disk, meaning that NiFi will ask the operating system not to cache the information. This is very expensive and can significantly reduce NiFi performance. However, if it is `false`, there could be the potential for data loss if either there is a sudden power loss or the operating system crashes. The default value is `false`.
|`nifi.content.repository.archive.cleanup.frequency`| The frequency with which to schedule the content archive clean up task. The default value is `1 Minute`. A value lower than `1 Second` is not allowed.
|`nifi.content.claim.truncation.enabled`|When a Content Repository file is shared by many FlowFiles (see `nifi.content.claim.max.appendable.size`), the file cannot be deleted until every FlowFile that references it has been removed. If the last FlowFile written to such a file is itself large and is removed while earlier FlowFiles in the same file are still in use, NiFi can truncate the file at the offset where that final FlowFile began, reclaiming the disk space it occupied without touching any of the earlier FlowFiles. Truncation only ever applies to this trailing FlowFile; it is not a general defragmentation mechanism and does not reclaim space from FlowFiles in the middle of the file. When `nifi.content.repository.archive.enabled` is `false`, truncation runs whenever a trailing FlowFile becomes eligible. When archiving is enabled, truncation runs only while the container is under archive disk pressure (see `nifi.content.repository.archive.max.usage.percentage`), so that archiving handles reclamation under normal conditions and truncation supplements it when the archive cannot keep up. Set this property to `false` to disable tail-claim truncation entirely; doing so is always safe and simply forfeits this disk-reclamation optimization. The default value is `true`.
|====

=== Provenance Repository
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,12 @@ public interface ContentClaim extends Comparable<ContentClaim> {
* @return the length of this ContentClaim
*/
long getLength();

/**
* Indicates whether or not this ContentClaim is a candidate for truncation.
* @return true if this ContentClaim is a candidate for truncation, false otherwise
*/
default boolean isTruncationCandidate() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,55 @@ public interface ResourceClaimManager {
*/
void markDestructable(ResourceClaim claim);

/**
* Indicates that the Resource Claim associated with the given Content Claim can now be
* truncated to the start of the ContentClaim. This should only ever be called after it is
* guaranteed that the FlowFile Repository has been synchronized with its underlying
* storage component for the same reason as described in the {@link #markDestructable(ResourceClaim)}
* method.
*
* <p>If the given ContentClaim currently has a positive truncation reference count
* (see {@link #getTruncationReferenceCount(ContentClaim)}), the claim will not be
* enqueued for truncation because at least one FlowFile still references it.</p>
*
* @param claim the ContentClaim that should be used for truncation
*/
void markTruncatable(ContentClaim claim);

/**
* Increments the truncation reference count for the given ContentClaim. This count tracks
* how many live FlowFiles reference a truncation-eligible ContentClaim and is used exclusively
* for truncation safety gating. Only truncation-eligible claims (non-zero offset, length above
* the truncation threshold) should be tracked via this method. Callers should not treat this
* as a general-purpose ContentClaim reference count.
*
* @param claim the ContentClaim whose truncation reference count should be incremented
*/
void incrementTruncationReferenceCount(ContentClaim claim);

/**
* Decrements the truncation reference count for the given ContentClaim. When the count reaches
* zero, the entry is removed from tracking. This method is used exclusively for truncation safety
* gating. Only truncation-eligible claims should be tracked. Callers should not treat this as a
* general-purpose ContentClaim reference count.
*
* @param claim the ContentClaim whose truncation reference count should be decremented
* @return the new truncation reference count after decrementing, or 0 if the entry was removed
*/
int decrementTruncationReferenceCount(ContentClaim claim);

/**
* Returns the current truncation reference count for the given ContentClaim. This count is
* used exclusively to determine whether it is safe to truncate a ContentClaim's underlying
* ResourceClaim. A positive count indicates that at least one FlowFile still references the
* claim and truncation must not proceed. A return value of 0 indicates that the claim is not
* being tracked (either it was never tracked or all references have been removed).
*
* @param claim the ContentClaim to query
* @return the current truncation reference count, or 0 if the claim is not tracked
*/
int getTruncationReferenceCount(ContentClaim claim);

/**
* Drains up to {@code maxElements} Content Claims from the internal queue
* of destructable content claims to the given {@code destination} so that
Expand All @@ -138,6 +187,16 @@ public interface ResourceClaimManager {
*/
void drainDestructableClaims(Collection<ResourceClaim> destination, int maxElements, long timeout, TimeUnit unit);

/**
* Drains up to {@code maxElements} Content Claims from the internal queue
* of truncatable content claims to the given {@code destination} so that
* they can be truncated.
*
* @param destination to drain to
* @param maxElements max items to drain
*/
void drainTruncatableClaims(Collection<ContentClaim> destination, int maxElements);

/**
* Clears the manager's memory of any and all ResourceClaims that it knows
* about
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2981,6 +2981,11 @@ public OutputStream write(FlowFile source) {
claimLog.debug("Creating ContentClaim {} for 'write' for {}", newClaim, source);
ensureNotAppending(newClaim);

// Build an OutputStream that we can return to the caller. Note that the returned OutputStream is wrapped with multiple layers
// of OutputStream, each with its own purpose. This layering is important for driving the capabilities that are necessary at the
// framework level. For example, we intercept flushes and closes to ensure that the framework is able to efficiently manage what
// gets written to the Content Repository and manage the full lifecycle of the Content Repository's OutputStream. When the
// ProcessSession is committed or rolled back, we ensure that the underlying streams are closed and flushed appropriately.
final OutputStream rawStream = claimCache.write(newClaim);
final OutputStream nonFlushable = new NonFlushableOutputStream(rawStream);
final OutputStream disableOnClose = new DisableOnCloseOutputStream(nonFlushable);
Expand Down Expand Up @@ -3118,6 +3123,12 @@ public FlowFile write(FlowFile source, final OutputStreamCallback writer) {
claimLog.debug("Creating ContentClaim {} for 'write' for {}", newClaim, source);

ensureNotAppending(newClaim);

// Build an OutputStream that we can return to the caller. Note that the returned OutputStream is wrapped with multiple layers
// of OutputStream, each with its own purpose. This layering is important for driving the capabilities that are necessary at the
// framework level. For example, we intercept flushes and closes to ensure that the framework is able to efficiently manage what
// gets written to the Content Repository and manage the full lifecycle of the Content Repository's OutputStream. When the
// ProcessSession is committed or rolled back, we ensure that the underlying streams are closed and flushed appropriately.
try (final OutputStream stream = claimCache.write(newClaim);
final NonFlushableOutputStream nonFlushableOutputStream = new NonFlushableOutputStream(stream);
final OutputStream disableOnClose = new DisableOnCloseOutputStream(nonFlushableOutputStream);
Expand Down Expand Up @@ -3405,6 +3416,11 @@ public FlowFile write(FlowFile source, final StreamCallback writer) {
claimCache.flush(currClaim.getResourceClaim());
}

// Build a InputStream and OutputStream that we can return to the caller. Note that the returned streams are wrapped with multiple layers,
// each with its own purpose. This layering is important for driving the capabilities that are necessary at the
// framework level. For example, we intercept flushes and closes to ensure that the framework is able to efficiently manage what
// gets written to the Content Repository and manage the full lifecycle of the Content Repository's OutputStream. When the
// ProcessSession is committed or rolled back, we ensure that the underlying streams are closed and flushed appropriately.
try (final InputStream is = getInputStream(source, currClaim, record.getCurrentClaimOffset(), true);
final InputStream limitedIn = new LimitedInputStream(is, source.getSize());
final InputStream disableOnCloseIn = new DisableOnCloseInputStream(limitedIn);
Expand Down
Loading
Loading