Skip to content

Commit ff8deab

Browse files
committed
Add ReadBufferDataHandle
This Handle acts as a transparent buffer which wraps any other DataHandle and buffers reads from that handle.
1 parent 42567de commit ff8deab

File tree

2 files changed

+444
-0
lines changed

2 files changed

+444
-0
lines changed
Lines changed: 300 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,300 @@
1+
/*
2+
* #%L
3+
* SciJava Common shared library for SciJava software.
4+
* %%
5+
* Copyright (C) 2009 - 2017 Board of Regents of the University of
6+
* Wisconsin-Madison, Broad Institute of MIT and Harvard, Max Planck
7+
* Institute of Molecular Cell Biology and Genetics, University of
8+
* Konstanz, and KNIME GmbH.
9+
* %%
10+
* Redistribution and use in source and binary forms, with or without
11+
* modification, are permitted provided that the following conditions are met:
12+
*
13+
* 1. Redistributions of source code must retain the above copyright notice,
14+
* this list of conditions and the following disclaimer.
15+
* 2. Redistributions in binary form must reproduce the above copyright notice,
16+
* this list of conditions and the following disclaimer in the documentation
17+
* and/or other materials provided with the distribution.
18+
*
19+
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20+
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21+
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
22+
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDERS OR CONTRIBUTORS BE
23+
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
24+
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
25+
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26+
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
27+
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28+
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29+
* POSSIBILITY OF SUCH DAMAGE.
30+
* #L%
31+
*/
32+
33+
package org.scijava.io.handle;
34+
35+
import java.io.IOException;
36+
import java.util.ArrayDeque;
37+
import java.util.ArrayList;
38+
import java.util.Arrays;
39+
import java.util.Deque;
40+
import java.util.HashMap;
41+
import java.util.List;
42+
import java.util.Map;
43+
44+
import org.scijava.io.location.Location;
45+
46+
/**
47+
* Read-only buffered {@link DataHandle}. It buffers the underlying handle into
48+
* a fixed number of pages, swapping them out when necessary.
49+
*/
50+
public class ReadBufferDataHandle extends AbstractHigherOrderHandle<Location> {
51+
52+
private static final int DEFAULT_PAGE_SIZE = 10_000;
53+
private static final int DEFAULT_NUM_PAGES = 10;
54+
55+
private final int pageSize;
56+
private final List<byte[]> pages;
57+
private final int[] slotToPage;
58+
private final LRUReplacementStrategy replacementStrategy;
59+
private final Map<Integer, Integer> pageToSlot;
60+
61+
private long offset = 0l;
62+
private byte[] currentPage;
63+
private int currentPageID = -1;
64+
65+
/**
66+
* Creates a {@link ReadBufferDataHandle} wrapping the provided handle using the
67+
* default values for the size of the pages ({@value #DEFAULT_PAGE_SIZE} byte)
68+
* and number of pages ({@link #DEFAULT_NUM_PAGES}).
69+
*
70+
* @param handle
71+
* the handle to wrap
72+
*/
73+
public ReadBufferDataHandle(final DataHandle<Location> handle) {
74+
this(handle, DEFAULT_PAGE_SIZE);
75+
}
76+
77+
/**
78+
* Creates a {@link ReadBufferDataHandle} wrapping the provided handle using the
79+
* default value for the number of pages ({@link #DEFAULT_NUM_PAGES}).
80+
*
81+
* @param handle
82+
* the handle to wrap
83+
* @param pageSize
84+
* the size of the used pages
85+
*/
86+
public ReadBufferDataHandle(final DataHandle<Location> handle, final int pageSize) {
87+
this(handle, pageSize, DEFAULT_NUM_PAGES);
88+
}
89+
90+
/**
91+
* Creates a {@link ReadBufferDataHandle} wrapping the provided handle.
92+
*
93+
* @param handle
94+
* the handle to wrap
95+
* @param pageSize
96+
* the size of the used pages
97+
* @param numPages
98+
* the number of pages to use
99+
*/
100+
public ReadBufferDataHandle(final DataHandle<Location> handle, final int pageSize, final int numPages) {
101+
super(handle);
102+
this.pageSize = pageSize;
103+
104+
// init maps
105+
slotToPage = new int[numPages];
106+
Arrays.fill(slotToPage, -1);
107+
108+
pages = new ArrayList<>(numPages);
109+
for (int i = 0; i < numPages; i++) {
110+
pages.add(null);
111+
}
112+
113+
pageToSlot = new HashMap<>();
114+
replacementStrategy = new LRUReplacementStrategy(numPages);
115+
}
116+
117+
/**
118+
* Ensures that the byte at the given offset is buffered, and sets the current
119+
* page to be the one containing the specified location.
120+
*/
121+
private void ensureBuffered(final long globalOffset) throws IOException {
122+
ensureOpen();
123+
final int pageID = (int) (globalOffset / pageSize);
124+
if (pageID == currentPageID)
125+
return;
126+
127+
final int slotID = pageToSlot.computeIfAbsent(pageID, replacementStrategy::pickVictim);
128+
final int inSlotID = slotToPage[slotID];
129+
130+
if (inSlotID != pageID) { // desired page is not buffered
131+
// update the mappings
132+
slotToPage[slotID] = pageID;
133+
pageToSlot.put(pageID, slotID);
134+
pageToSlot.put(inSlotID, null);
135+
136+
// read the page
137+
currentPage = readPage(pageID, slotID);
138+
} else {
139+
currentPage = pages.get(slotID);
140+
}
141+
replacementStrategy.accessed(slotID);
142+
currentPageID = pageID;
143+
}
144+
145+
/**
146+
* Reads the page with the id <code>pageID</code> into the slot with the id
147+
* <code>slotID</code>.
148+
*
149+
* @param pageID
150+
* the id of the page to read
151+
* @param slotID
152+
* the id of the slot to read the page into
153+
* @return the read page
154+
* @throws IOException
155+
* if the reading fails
156+
*/
157+
private byte[] readPage(final int pageID, final int slotID) throws IOException {
158+
replacementStrategy.accessed(slotID);
159+
byte[] page = pages.get(slotID);
160+
if (page == null) {
161+
// lazy initialization
162+
page = new byte[pageSize];
163+
pages.set(slotID, page);
164+
}
165+
166+
final long startOfPage = pageID * (long) pageSize;
167+
if (handle().offset() != startOfPage) {
168+
handle().seek(startOfPage);
169+
}
170+
handle().read(page);
171+
return page;
172+
}
173+
174+
/**
175+
* Calculates the offset in the current page for the given global offset
176+
*/
177+
private int globalToLocalOffset(final long off) {
178+
return (int) off % pageSize;
179+
}
180+
181+
@Override
182+
public void seek(final long pos) throws IOException {
183+
this.offset = pos;
184+
}
185+
186+
@Override
187+
public int read(final byte[] b, final int targetOffset, final int len)
188+
throws IOException
189+
{
190+
if (len == 0) return 0;
191+
192+
// the last position we will read
193+
final long endPos = offset + len;
194+
195+
// the number of bytes we plan to read
196+
final int readLength = (int) (endPos < length() ? len : length() - offset);
197+
198+
int read = 0; // the number of bytes we have read
199+
int localTargetOff = targetOffset;
200+
201+
while (read < readLength) {
202+
ensureBuffered(offset);
203+
204+
// calculate local offsets
205+
final int pageOffset = globalToLocalOffset(offset);
206+
int localLength = pageSize - pageOffset;
207+
if (read + localLength > readLength) {
208+
localLength = readLength - read;
209+
}
210+
211+
// copy the data
212+
System.arraycopy(currentPage, pageOffset, b, localTargetOff, localLength);
213+
214+
// update offsets
215+
read += localLength;
216+
offset += localLength;
217+
localTargetOff += localLength;
218+
}
219+
// return -1 if we tried to read at least one byte but failed
220+
return read != 0 ? read : -1;
221+
}
222+
223+
@Override
224+
public byte readByte() throws IOException {
225+
ensureBuffered(offset);
226+
return currentPage[globalToLocalOffset(offset++)];
227+
}
228+
229+
@Override
230+
public boolean isReadable() {
231+
return true;
232+
}
233+
234+
@Override
235+
public long offset() throws IOException {
236+
return offset;
237+
}
238+
239+
@Override
240+
protected void cleanup() {
241+
pages.clear();
242+
currentPage = null;
243+
}
244+
245+
@Override
246+
public void write(final int b) throws IOException {
247+
throw DataHandles.readOnlyException();
248+
}
249+
250+
@Override
251+
public void write(final byte[] b, final int off, final int len) throws IOException {
252+
throw DataHandles.readOnlyException();
253+
}
254+
255+
@Override
256+
public void setLength(final long length) throws IOException {
257+
throw DataHandles.readOnlyException();
258+
}
259+
260+
/**
261+
* Simple strategy to pick the slot that get's evicted from the cache. This
262+
* strategy always picks the least recently used slot.
263+
*/
264+
private class LRUReplacementStrategy {
265+
266+
private final Deque<Integer> queue;
267+
268+
/**
269+
* Creates a {@link LRUReplacementStrategy} with the specified number of slots.
270+
*
271+
* @param numSlots
272+
* the number of slots to use
273+
*/
274+
public LRUReplacementStrategy(final int numSlots) {
275+
queue = new ArrayDeque<>(numSlots);
276+
277+
// fill the que
278+
for (int i = 0; i < numSlots; i++) {
279+
queue.add(i);
280+
}
281+
}
282+
283+
/**
284+
* Notifies this strategy that a slot has been accessed, pushing it to the end
285+
* of the queue.
286+
*
287+
* @param slotID
288+
* the id of the slot that has been accessed
289+
*/
290+
public void accessed(final int slotID) {
291+
// put accessed element to the end of the que
292+
queue.remove(slotID);
293+
queue.add(slotID);
294+
}
295+
296+
public int pickVictim(final int pageID) {
297+
return queue.peek();
298+
}
299+
}
300+
}

0 commit comments

Comments
 (0)