Skip to content

Commit 42aef91

Browse files
committed
Xor: implement stream serialization
There are already methods operating on ByteBuffer, however versions taking streams allow to avoid data copying, and more directly compatible with various network APIs. Also, for reference, Guava Bloom filter implementation uses streams. There's a bit of code duplication as the result. Sadly, there's no standard class to wrap ByteBuffer in a stream, and pulling in an external dependency seems like an overkill here.
1 parent e35ab0d commit 42aef91

7 files changed

Lines changed: 209 additions & 6 deletions

File tree

fastfilter/src/main/java/org/fastfilter/Filter.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package org.fastfilter;
22

3+
import java.io.IOException;
4+
import java.io.OutputStream;
35
import java.nio.ByteBuffer;
46

57
/**
@@ -85,4 +87,15 @@ default int getSerializedSize() {
8587
default void serialize(ByteBuffer buffer) {
8688
throw new UnsupportedOperationException();
8789
}
90+
91+
/**
92+
* Serializes the filter state into the provided {@code OutputStream}.
93+
*
94+
* @param out the output stream where the serialized state of the filter will be written
95+
* @throws IOException if writing to the stream fails
96+
* @throws UnsupportedOperationException if the operation is not supported by the filter implementation
97+
*/
98+
default void serialize(OutputStream out) throws IOException {
99+
throw new UnsupportedOperationException();
100+
}
88101
}

fastfilter/src/main/java/org/fastfilter/xor/Xor16.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,10 @@
11
package org.fastfilter.xor;
22

3+
import java.io.DataInputStream;
4+
import java.io.DataOutputStream;
5+
import java.io.IOException;
6+
import java.io.InputStream;
7+
import java.io.OutputStream;
38
import java.nio.ByteBuffer;
49

510
import org.fastfilter.Filter;
@@ -199,4 +204,27 @@ public static Xor16 deserialize(ByteBuffer buffer) {
199204

200205
return new Xor16(blockLength, bitCount, seed, fingerprints);
201206
}
207+
208+
public void serialize(OutputStream out) throws IOException {
209+
DataOutputStream dout = new DataOutputStream(out);
210+
dout.writeInt(blockLength);
211+
dout.writeLong(seed);
212+
dout.writeInt(fingerprints.length);
213+
for (final short fp : fingerprints) {
214+
dout.writeShort(fp);
215+
}
216+
}
217+
218+
public static Xor16 deserialize(InputStream in) throws IOException {
219+
DataInputStream din = new DataInputStream(in);
220+
final int blockLength = din.readInt();
221+
final long seed = din.readLong();
222+
final int len = din.readInt();
223+
final short[] fingerprints = new short[len];
224+
for (int i = 0; i < len; i++) {
225+
fingerprints[i] = din.readShort();
226+
}
227+
final int bitCount = len * BITS_PER_FINGERPRINT;
228+
return new Xor16(blockLength, bitCount, seed, fingerprints);
229+
}
202230
}

fastfilter/src/main/java/org/fastfilter/xor/Xor8.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -241,4 +241,22 @@ public static Xor8 deserialize(ByteBuffer buffer) {
241241

242242
return new Xor8(size, seed, fingerprints);
243243
}
244+
245+
public void serialize(OutputStream out) throws IOException {
246+
DataOutputStream dout = new DataOutputStream(out);
247+
dout.writeInt(size);
248+
dout.writeLong(seed);
249+
dout.writeInt(fingerprints.length);
250+
dout.write(fingerprints);
251+
}
252+
253+
public static Xor8 deserialize(InputStream in) throws IOException {
254+
DataInputStream din = new DataInputStream(in);
255+
final int size = din.readInt();
256+
final long seed = din.readLong();
257+
final int len = din.readInt();
258+
final byte[] fingerprints = new byte[len];
259+
din.readFully(fingerprints);
260+
return new Xor8(size, seed, fingerprints);
261+
}
244262
}

fastfilter/src/main/java/org/fastfilter/xor/XorBinaryFuse16.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,10 @@
11
package org.fastfilter.xor;
22

3+
import java.io.DataInputStream;
4+
import java.io.DataOutputStream;
5+
import java.io.IOException;
6+
import java.io.InputStream;
7+
import java.io.OutputStream;
38
import java.nio.ByteBuffer;
49
import java.util.Arrays;
510
import org.fastfilter.Filter;
@@ -325,4 +330,29 @@ public static XorBinaryFuse16 deserialize(ByteBuffer buffer) {
325330

326331
return new XorBinaryFuse16(segmentCount, segmentLength, seed, fingerprints);
327332
}
333+
334+
public void serialize(OutputStream out) throws IOException {
335+
DataOutputStream dout = new DataOutputStream(out);
336+
dout.writeInt(segmentLength);
337+
dout.writeInt(segmentCountLength);
338+
dout.writeLong(seed);
339+
dout.writeInt(fingerprints.length);
340+
for (final short fp : fingerprints) {
341+
dout.writeShort(fp);
342+
}
343+
}
344+
345+
public static XorBinaryFuse16 deserialize(InputStream in) throws IOException {
346+
DataInputStream din = new DataInputStream(in);
347+
final int segmentLength = din.readInt();
348+
final int segmentCountLength = din.readInt();
349+
final long seed = din.readLong();
350+
final int len = din.readInt();
351+
final short[] fingerprints = new short[len];
352+
for (int i = 0; i < len; i++) {
353+
fingerprints[i] = din.readShort();
354+
}
355+
final int segmentCount = segmentCountLength / segmentLength;
356+
return new XorBinaryFuse16(segmentCount, segmentLength, seed, fingerprints);
357+
}
328358
}

fastfilter/src/main/java/org/fastfilter/xor/XorBinaryFuse32.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,10 @@
11
package org.fastfilter.xor;
22

3+
import java.io.DataInputStream;
4+
import java.io.DataOutputStream;
5+
import java.io.IOException;
6+
import java.io.InputStream;
7+
import java.io.OutputStream;
38
import java.nio.ByteBuffer;
49
import java.util.Arrays;
510

@@ -313,4 +318,29 @@ public static XorBinaryFuse32 deserialize(ByteBuffer buffer) {
313318

314319
return new XorBinaryFuse32(segmentCount, segmentLength, seed, fingerprints);
315320
}
321+
322+
public void serialize(OutputStream out) throws IOException {
323+
DataOutputStream dout = new DataOutputStream(out);
324+
dout.writeInt(segmentLength);
325+
dout.writeInt(segmentCountLength);
326+
dout.writeLong(seed);
327+
dout.writeInt(fingerprints.length);
328+
for (final int fp : fingerprints) {
329+
dout.writeInt(fp);
330+
}
331+
}
332+
333+
public static XorBinaryFuse32 deserialize(InputStream in) throws IOException {
334+
DataInputStream din = new DataInputStream(in);
335+
final int segmentLength = din.readInt();
336+
final int segmentCountLength = din.readInt();
337+
final long seed = din.readLong();
338+
final int len = din.readInt();
339+
final int[] fingerprints = new int[len];
340+
for (int i = 0; i < len; i++) {
341+
fingerprints[i] = din.readInt();
342+
}
343+
final int segmentCount = segmentCountLength / segmentLength;
344+
return new XorBinaryFuse32(segmentCount, segmentLength, seed, fingerprints);
345+
}
316346
}

fastfilter/src/main/java/org/fastfilter/xor/XorBinaryFuse8.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,10 @@
11
package org.fastfilter.xor;
22

3+
import java.io.DataInputStream;
4+
import java.io.DataOutputStream;
5+
import java.io.IOException;
6+
import java.io.InputStream;
7+
import java.io.OutputStream;
38
import java.nio.ByteBuffer;
49
import java.util.Arrays;
510

@@ -321,4 +326,25 @@ public static XorBinaryFuse8 deserialize(ByteBuffer buffer) {
321326

322327
return new XorBinaryFuse8(segmentCount, segmentLength, seed, fingerprints);
323328
}
329+
330+
public void serialize(OutputStream out) throws IOException {
331+
DataOutputStream dout = new DataOutputStream(out);
332+
dout.writeInt(segmentLength);
333+
dout.writeInt(segmentCountLength);
334+
dout.writeLong(seed);
335+
dout.writeInt(fingerprints.length);
336+
dout.write(fingerprints);
337+
}
338+
339+
public static XorBinaryFuse8 deserialize(InputStream in) throws IOException {
340+
DataInputStream din = new DataInputStream(in);
341+
final int segmentLength = din.readInt();
342+
final int segmentCountLength = din.readInt();
343+
final long seed = din.readLong();
344+
final int len = din.readInt();
345+
final byte[] fingerprints = new byte[len];
346+
din.readFully(fingerprints);
347+
final int segmentCount = segmentCountLength / segmentLength;
348+
return new XorBinaryFuse8(segmentCount, segmentLength, seed, fingerprints);
349+
}
324350
}

fastfilter/src/test/java/org/fastfilter/xor/SerializationTest.java

Lines changed: 64 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,10 @@
55
import static org.junit.Assert.assertTrue;
66
import static org.junit.Assert.fail;
77

8+
import java.io.ByteArrayInputStream;
9+
import java.io.ByteArrayOutputStream;
10+
import java.io.IOException;
11+
import java.io.InputStream;
812
import java.nio.ByteBuffer;
913
import java.util.List;
1014
import java.util.function.Function;
@@ -20,28 +24,36 @@ public class SerializationTest {
2024
private final String filterName;
2125
private final Function<long[], Filter> constructor;
2226
private final Function<ByteBuffer, Filter> deserializer;
27+
private final StreamDeserializer streamDeserializer;
2328

2429
public SerializationTest(String filterName,
2530
Function<long[], Filter> constructor,
26-
Function<ByteBuffer, Filter> deserializer) {
31+
Function<ByteBuffer, Filter> deserializer,
32+
StreamDeserializer streamDeserializer) {
2733
this.filterName = filterName;
2834
this.constructor = constructor;
2935
this.deserializer = deserializer;
36+
this.streamDeserializer = streamDeserializer;
3037
}
3138

3239
@Parameters(name = "{0}")
3340
public static List<Object[]> filters() {
3441
return List.of(
3542
new Object[] {"Xor8", (Function<long[], Filter>) Xor8::construct,
36-
(Function<ByteBuffer, Filter>) Xor8::deserialize},
43+
(Function<ByteBuffer, Filter>) Xor8::deserialize,
44+
(StreamDeserializer) Xor8::deserialize},
3745
new Object[] {"Xor16", (Function<long[], Filter>) Xor16::construct,
38-
(Function<ByteBuffer, Filter>) Xor16::deserialize},
46+
(Function<ByteBuffer, Filter>) Xor16::deserialize,
47+
(StreamDeserializer) Xor16::deserialize},
3948
new Object[] {"XorBinaryFuse8", (Function<long[], Filter>) XorBinaryFuse8::construct,
40-
(Function<ByteBuffer, Filter>) XorBinaryFuse8::deserialize},
49+
(Function<ByteBuffer, Filter>) XorBinaryFuse8::deserialize,
50+
(StreamDeserializer) XorBinaryFuse8::deserialize},
4151
new Object[] {"XorBinaryFuse16", (Function<long[], Filter>) XorBinaryFuse16::construct,
42-
(Function<ByteBuffer, Filter>) XorBinaryFuse16::deserialize},
52+
(Function<ByteBuffer, Filter>) XorBinaryFuse16::deserialize,
53+
(StreamDeserializer) XorBinaryFuse16::deserialize},
4354
new Object[] {"XorBinaryFuse32", (Function<long[], Filter>) XorBinaryFuse32::construct,
44-
(Function<ByteBuffer, Filter>) XorBinaryFuse32::deserialize}
55+
(Function<ByteBuffer, Filter>) XorBinaryFuse32::deserialize,
56+
(StreamDeserializer) XorBinaryFuse32::deserialize}
4557
);
4658
}
4759

@@ -85,6 +97,52 @@ public void shouldSerializeAndDeserializeMediumFilter() {
8597
assertFalse("Key 1500L should not be in " + filterName + " filter", deserializedFilter.mayContain(1500L));
8698
}
8799

100+
@Test
101+
public void shouldSerializeAndDeserializeMediumFilterFromStream() throws IOException {
102+
// Arrange
103+
final var keys = new long[]{100L, 200L, 300L, 400L, 500L, 600L, 700L, 800L, 900L, 1000L};
104+
final var originalFilter = constructor.apply(keys);
105+
final var buffer = ByteBuffer.allocate(originalFilter.getSerializedSize());
106+
107+
// Act
108+
originalFilter.serialize(buffer);
109+
final var input = new ByteArrayInputStream(buffer.array());
110+
final var deserializedFilter = streamDeserializer.deserialize(input);
111+
112+
// Assert
113+
for (final long key : keys) {
114+
assertTrue("Key " + key + " should be present in deserialized " + filterName + " filter",
115+
deserializedFilter.mayContain(key));
116+
}
117+
assertFalse("Key 50L should not be in " + filterName + " filter", deserializedFilter.mayContain(50L));
118+
assertFalse("Key 1500L should not be in " + filterName + " filter", deserializedFilter.mayContain(1500L));
119+
}
120+
121+
@Test
122+
public void shouldSerializeToStreamAndDeserializeFromByteBuffer() throws IOException {
123+
// Arrange
124+
final var keys = new long[]{10L, 20L, 30L, 40L, 50L, 60L, 70L, 80L};
125+
final var originalFilter = constructor.apply(keys);
126+
final var out = new ByteArrayOutputStream();
127+
128+
// Act
129+
originalFilter.serialize(out);
130+
final var buffer = ByteBuffer.wrap(out.toByteArray());
131+
final var deserializedFilter = deserializer.apply(buffer);
132+
133+
// Assert
134+
for (final long key : keys) {
135+
assertTrue("Key " + key + " should be present in deserialized " + filterName + " filter",
136+
deserializedFilter.mayContain(key));
137+
}
138+
assertFalse("Key 15L should not be in " + filterName + " filter", deserializedFilter.mayContain(15L));
139+
}
140+
141+
@FunctionalInterface
142+
private interface StreamDeserializer {
143+
Filter deserialize(InputStream in) throws IOException;
144+
}
145+
88146
@Test
89147
public void shouldSerializeAndDeserializeLargeFilter() {
90148
// Arrange

0 commit comments

Comments
 (0)