Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 18 additions & 2 deletions lang/java/mapred/src/main/java/org/apache/avro/mapred/FsInput.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,17 @@
import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FutureDataInputStreamBuilder;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FSDataInputStream;

import org.apache.avro.file.SeekableInput;

import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
import static org.apache.hadoop.util.functional.FutureIO.awaitFuture;

/** Adapt an {@link FSDataInputStream} to {@link SeekableInput}. */
public class FsInput implements Closeable, SeekableInput {
private final FSDataInputStream stream;
Expand All @@ -40,8 +45,19 @@ public FsInput(Path path, Configuration conf) throws IOException {

/** Construct given a path and a {@code FileSystem}. */
public FsInput(Path path, FileSystem fileSystem) throws IOException {
this.len = fileSystem.getFileStatus(path).getLen();
this.stream = fileSystem.open(path);
final FileStatus st = fileSystem.getFileStatus(path);
this.len = st.getLen();
// use the hadoop 3.3+ openFile API, passing in status
// and read policy. object stores can use these to
// optimize read performance and save on a HEAD request when opening
// a file.
final FutureDataInputStreamBuilder builder = fileSystem.openFile(path).opt(FS_OPTION_OPENFILE_READ_POLICY,
"avro, sequential, adaptive");

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since these read policies are going to be used by file sysem to understand the user intended pattern. Do we have a way to standardise these values via means of constants or an enum class that can be used across projects?

I might be missing a similar thing already existing in hadoop. Please let me know if its already there.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we do have them in hadoop, but that hard codes to specific versions (the "avro" one is fairly recent. Better for me to put them into an avro class

if (path.equals(st.getPath())) {
// set the file status if this isn't any wrapped filesystem.
builder.withFileStatus(st);
}
this.stream = awaitFuture(builder.build());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
Expand All @@ -40,6 +42,7 @@
public class TestFsInput {
private static File file;
private static final String FILE_CONTENTS = "abcdefghijklmnopqrstuvwxyz";
public static final int LENGTH = FILE_CONTENTS.length();
private Configuration conf;
private FsInput fsInput;

Expand Down Expand Up @@ -71,7 +74,7 @@ void configurationConstructor() throws Exception {
try (FsInput in = new FsInput(new Path(file.getPath()), conf)) {
int expectedByteCount = 1;
byte[] readBytes = new byte[expectedByteCount];
int actualByteCount = fsInput.read(readBytes, 0, expectedByteCount);
int actualByteCount = in.read(readBytes, 0, expectedByteCount);
assertThat(actualByteCount, is(equalTo(expectedByteCount)));
}
}
Expand All @@ -83,7 +86,7 @@ void fileSystemConstructor() throws Exception {
try (FsInput in = new FsInput(path, fs)) {
int expectedByteCount = 1;
byte[] readBytes = new byte[expectedByteCount];
int actualByteCount = fsInput.read(readBytes, 0, expectedByteCount);
int actualByteCount = in.read(readBytes, 0, expectedByteCount);
assertThat(actualByteCount, is(equalTo(expectedByteCount)));
}
}
Expand Down Expand Up @@ -123,4 +126,57 @@ void tell() throws Exception {
assertThat(actualTellPos, is(equalTo(expectedTellPos)));
}

/**
* How does a negative seek manifest itself? It's expected to fail on the seek()
* call and not move the file position. Most streams raise EOFExceotion; some
* raise IllegalArgumentException instead.
*/
@Test
void seekNegative() throws Exception {
fsInput.seek(1);
assertThrows(Exception.class, () -> fsInput.seek(-1));
assertThat("file position after a negative seek", fsInput.tell(), is(equalTo(1L)));
}

/**
* Seek past the EOF then read.
*/
@Test
void seekPastEOF() {
assertThrows(Exception.class, () -> fsInput.seek(LENGTH + 2));
}

/**
* Read to the exact EOF then do a read(), which is required to return -1 to
* indicate the EOF has been reached.
*/
@Test
void readAtEOF() throws Exception {
fsInput.seek(LENGTH);
final int l = 8;
byte[] readBytes = new byte[l];
assertThat("bytes read from beyond EOF", fsInput.read(readBytes, 0, l), is(equalTo(-1)));
}

/**
* Read across the end of file. All data available up to the EOF is returned.
*/
@Test
void readAcrossEOF() throws Exception {
fsInput.seek(LENGTH - 2);
final int l = 8;
byte[] readBytes = new byte[l];
assertThat("bytes read from beyond EOF", fsInput.read(readBytes, 0, l), is(equalTo(2)));
assertThat("bytes read from beyond EOF", fsInput.tell(), is(equalTo((long) LENGTH)));
}

/**
* Delete the file before trying to open it.
*/
@Test
void openMissingFile() {
file.delete();
assertThrows(FileNotFoundException.class, () -> new FsInput(new Path(file.getPath()), conf).close());
}

}
Loading