-
Notifications
You must be signed in to change notification settings - Fork 27
PECOBLR-1121 Arrow patch to circumvent Arrow issues with JDk 16+. #1156
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Patch Arrow to create a Databricks ArrowBuf which allocates memory on the heap and provides access to it through Java methods. This removes the need to specify "--add-opens=java.base/java.nio=ALL-UNNAMED" as JVM args for JDK 16+.
Use native Arrow if available. Otherwise fallback to the patch version.
Remove irrelevant reference counting in patch code. Patch code uses heap memory for arrow operations and reference counting is not required.
Remove redundant todos for accounting.
Patch DecimalUtility to not use unsafe methods to set decimal values on DatabricksArrowBuf.
Add notice to all patched Arrow Java code. In NOTICE file mention Arrow has been patched by Databricks.
On static init failure of MemoryUtil class, it prints a stack trace to stderr. Remove this print, since now we fallback to DatabricksBufferAllocator when this happens. And the error is logged as well.
|
|
||
| // ---- Databricks patch start ---- | ||
| private final HistoricalLog historicalLog = | ||
| DEBUG ? new HistoricalLog(DEBUG_LOG_LENGTH, "ArrowBuf[%d]", id) : null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we even need to worry about the historical log? can we not just set it to null? seems like its usage is null checked everywhere anyway?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, add a comment on why this is needed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Idea is to have minimal patches w.r.t the Arrow code.
| long currentReservation = reservedSize.get(); | ||
| long newReservation = currentReservation + nBytes; | ||
| if (newReservation > allocator.getHeadroom() + currentReservation) { | ||
| return false; | ||
| } | ||
| reservedSize.addAndGet(nBytes); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is not thread safe, should we use compareAndSet?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The interface that this class implements AllocationReservation, is explicitly marked as not thread-safe. So we are following contract. Also this code is generally called from a single thread.
| @Override | ||
| public ReferenceManager getReferenceManager() { | ||
| return referenceManager; | ||
| } | ||
|
|
||
| @Override | ||
| public long capacity() { | ||
| return capacity; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think a bunch of these methods do not have a changed override behaviour, can we remove them so that it is easy to review and maintain?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Idea is to have all behaviour captured within a single class and make all things explicit. Also it is not dependent on any changes in the base class ArrowBuf.
| if (capacity > Integer.MAX_VALUE) { | ||
| throw new IllegalArgumentException( | ||
| "DatabricksArrowBuf does not support capacity > Integer.MAX_VALUE"); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- why this limit?
- this is missing from the other constructor
- can we reuse constructors
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ByteBuffer.allocate constructor takes an integer argument. It is an inherent limitation of Java nio ByteBuffer. In JDBC case the maximum allocation will be a chunk size, which is 20MiB as of today.
Reuse constructor - I will give it more thought and get back.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In one case the constructor is allocating the ByteBuffer, the check has to happen before that. In the other case the allocator is being sliced. Delegating to one single constructor is not clean in this case.
| static { | ||
| RootAllocator rootAllocator = null; | ||
| try { | ||
| rootAllocator = new RootAllocator(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we were using Integer.MAX_VALUE, not needed now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Internally RootAllocator delegates with Integer.MAX_VALUE.
| // ---- to avoid unsafe allocation initialization errors. | ||
| public static final String DEBUG_ALLOCATOR = "arrow.memory.debug.allocator"; | ||
| public static final int DEBUG_LOG_LENGTH = 6; | ||
| public static final boolean DEBUG; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: rename to better name than debug?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code is copied verbatim from another Arrow class BaseAllocator.
| + "(See https://arrow.apache.org/docs/java/install.html)", | ||
| e); | ||
| // ---- Databricks patch start ---- | ||
| // ---- Remove 'failure.printStackTrace();' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we log the stack trace?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code explicitly removes the print of stack trace to stderr to prevent customers from thinking something is broken, since our fix works with the native Arrow being absent as well. The exception is caught and error message is logged in static initiliazer of ArrowBufferAllocator.
| @Override | ||
| public ArrowBuf allocateBuffer() { | ||
| assertNotUsed(); | ||
| if (!used.compareAndSet(false, true)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we add logging?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure. I will add it to more classes as well.
jayantsing-db
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In progress.
| RootAllocator rootAllocator = null; | ||
| try { | ||
| rootAllocator = new RootAllocator(); | ||
| } catch (Throwable t) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this be more specific like java.lang.reflect.InaccessibleObjectException? This is so that we know when we want to fallback to custom allocator. Otherwise, we could be falling back for unknown reason.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Different JVM versions throw different exceptions, so code becomes brittle and error prone. Also there is nothing inside RootAllocator that should throw an exception other than Class initialisation failure. This is the catch-all safe option.
| static { | ||
| RootAllocator rootAllocator = null; | ||
| try { | ||
| rootAllocator = new RootAllocator(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is instantiating a root allocator object sufficient/robust signal to fallback to custom allocator? For example, for resource/IO wrapper objects like root allocator, Arrow may choose to just create a lightweight object/handle and the off-heap memory is not unsafe-accessed until a memory is explicitly buffered using that root allocator object. So, just creating a root allocator object may succeed but as the arrow reader proceeds to buffer off-heap memory during runtime, then it fails.
Should we do something like this to be deliberate about the fallback logic?
Class<?> unsafeClass = Class.forName....
Field f = ....
f.setAccessible(true);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Awesome catch! Changed the logic to write and check.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added tests for these as well. See stack-1.
| // ---- Databricks patch start ---- | ||
| // ---- Copied verbatim from BaseAllocator. We avoid initializing static fields of BaseAllocator | ||
| // ---- to avoid unsafe allocation initialization errors. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How do we ensure that base allocator class is not loaded (and hence the breaking static code inside base allocator) elsewhere during runtime in JDK 16+?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tests should catch this. Please see DatabricksArrowPatchReaderWriterTest
| // Initialize this before DEFAULT_CONFIG as DEFAULT_CONFIG will eventually initialize the | ||
| // allocation manager, | ||
| // which in turn allocates an ArrowBuf, which requires DEBUG to have been properly initialized |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this is already within the patch block, we can remove this potentially confusing comment from the BaseAllocator class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Copied code verbatim. Easier to diff.
| private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ArrowBuf.class); | ||
|
|
||
| // ---- Databricks patch start ---- | ||
| // ---- Copied verbatim from BaseAllocator. We avoid initializing static fields of BaseAllocator |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
qq: What was the procedure to determine that "we need to break the static dependency chain to BaseAllocator" because it ends up using unsafe? Is it based on empirical analysis or some deterministic code scan?
The reason I am asking is that there could be a whole bunch of classes loading in this arrow parsing path which might end up using unsafe and ultimately break in exciting new ways.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right. Only way to validate is through tests. Please see PR for stack-1 and stack-3.
| // This exception will get swallowed, but it's necessary for the static analysis that ensures | ||
| // the static fields above get initialized |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Do we need to ensure that this exception is indeed correctly swallowed? If unsure, should we just stop throwing exception in the patch (although i agree with the above philosophy of avoiding as much patch lines as possible)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is Arrow code. No changes from our end.
| @Override | ||
| public ArrowBuf deriveBuffer(ArrowBuf sourceBuffer, long index, long length) { | ||
| Preconditions.checkArgument( | ||
| length <= Integer.MAX_VALUE, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be index + length?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
index + length <= sourceBuffer.capacity() check is present after this line.
| // Create a new DatabricksArrowBuf sharing the same byte buffer. | ||
| DatabricksArrowBuf buf = checkBufferType(sourceBuffer); | ||
| return new DatabricksArrowBuf( | ||
| this, null, buf.getByteBuffer(), buf.getOffset() + (int) index, length); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This always sets buffer manager as null. The reallocIfNeeded will always throw error on slices.
@Override
public ArrowBuf reallocIfNeeded(final long size) {
Preconditions.checkArgument(size >= 0, "reallocation size must be non-negative");
if (this.capacity() >= size) {
return this;
}
if (bufferManager != null) {
return bufferManager.replace(this, size);
} else {
throw new UnsupportedOperationException(
"Realloc is only available in the context of operator's UDFs");
}
}
In principle, this may be OKAY because JDBC is read-only from native bytes and reallocIfNeeded is never called?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code is exactly as it is in ArrowBuf. Their code path also has cases where bufferManager is null. It is an artifact of how the code is called.
|
|
||
| @Override | ||
| public boolean release(int decrement) { | ||
| return getRefCount() == 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will always return false. Could this cause any unknown issues? For example, in the chain of calls, when an arrow reader is closed -> vectors are closed -> buffer is released; could this method (always returning false), cause any unintended leaks by short-circuiting any close chain? I understand that this doesn't matter for on-heap arrow bufs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have read through the code, it should not.
|
|
||
| @Override | ||
| public ArrowBuf retain(ArrowBuf srcBuffer, BufferAllocator targetAllocator) { | ||
| DatabricksArrowBuf buf = checkBufferType(srcBuffer); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we assert target allocator type too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
targetAllocator is unused in our case.
|
Quick question (while I am excited to see this go to production) how do we want to handle fragility and maintainability here? My preference would be to put this patch in the arrow-java repo under the Databricks module so we minimize the chances of things breaking or drifting over time. |
Instantiating RootAllocator is insufficient to check that Unsafe memory operations are permitted in Arrow. Writing to an allocated object to validate that it works.

Description
Databricks server shares query results in Arrow format for easy cross language functionality. The JDBC driver experiences compatibility issues with JDK 16 and later versions when processing Arrow results.
This problem arises from stricter encapsulation of internal APIs in newer Java versions, which affects the driver's use of the Apache Arrow result format consumption with the Apache Arrow library. The JDBC driver is used in partner solutions, where they do not have control of the runtime environment, and the workaround of setting JVM arguments is not feasible.
Testing
Tests are added in other stacked PRs.
Additional Notes to the Reviewer
Its a stacked PR.