Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -234,25 +234,40 @@ private UserCodeClassLoader getOrResolveClassLoader(
verifyIsNotReleased();

if (resolvedClassLoader == null) {
boolean systemClassLoader =
wrapsSystemClassLoader && libraries.isEmpty() && classPaths.isEmpty();
resolvedClassLoader =
new ResolvedClassLoader(
systemClassLoader
? ClassLoader.getSystemClassLoader()
: createUserCodeClassLoader(
jobId, libraries, classPaths),
libraries,
classPaths,
systemClassLoader);
resolvedClassLoader = createResolvedClassLoader(libraries, classPaths);
} else {
resolvedClassLoader.verifyClassLoader(libraries, classPaths);
try {
resolvedClassLoader.verifyClassLoader(libraries, classPaths);
} catch (IllegalStateException e) {
LOG.warn(
"Library cache entry for job {} has a classloader resolved with different "
+ "library BLOBs than requested. This can happen during JobManager "
+ "failover. Re-creating the classloader with the new blob keys.",
jobId,
e);
resolvedClassLoader = createResolvedClassLoader(libraries, classPaths);
}
}

return resolvedClassLoader;
}
}

@GuardedBy("lockObject")
private ResolvedClassLoader createResolvedClassLoader(
Collection<PermanentBlobKey> libraries, Collection<URL> classPaths)
throws IOException {
boolean systemClassLoader =
wrapsSystemClassLoader && libraries.isEmpty() && classPaths.isEmpty();
return new ResolvedClassLoader(
systemClassLoader
? ClassLoader.getSystemClassLoader()
: createUserCodeClassLoader(jobId, libraries, classPaths),
libraries,
classPaths,
systemClassLoader);
}

@GuardedBy("lockObject")
private URLClassLoader createUserCodeClassLoader(
JobID jobId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.junit.Assume.assumeFalse;
import static org.junit.Assume.assumeTrue;

/** Tests for {@link BlobLibraryCacheManager}. */
Expand Down Expand Up @@ -145,18 +146,16 @@ public void testLibraryCacheManagerDifferentJobsCleanup() throws Exception {

try {
classLoaderLeaseJob2.getOrResolveClassLoader(keys1, Collections.<URL>emptyList());
fail("Should fail with an IllegalStateException");
} catch (IllegalStateException e) {
// that's what we want
fail("Should fail with an IOException");
} catch (IOException e) {
assertTrue(e.getMessage(), e.getMessage().contains("Failed to fetch BLOB"));
}

try {
classLoaderLeaseJob2.getOrResolveClassLoader(
keys2, Collections.singletonList(new URL("file:///tmp/does-not-exist")));
fail("Should fail with an IllegalStateException");
} catch (IllegalStateException e) {
// that's what we want
}
final UserCodeClassLoader recreatedClassLoader =
classLoaderLeaseJob2.getOrResolveClassLoader(
keys2,
Collections.singletonList(new URL("file:///tmp/does-not-exist")));
assertThat(recreatedClassLoader, not(sameInstance(classLoader2)));

assertEquals(2, libCache.getNumberOfManagedJobs());
assertEquals(1, libCache.getNumberOfReferenceHolders(jobId1));
Expand Down Expand Up @@ -268,21 +267,15 @@ public void testLibraryCacheManagerCleanup() throws Exception {
classLoaderLease2.getOrResolveClassLoader(keys, Collections.emptyList());
assertThat(classLoader1, sameInstance(classLoader2));

try {
classLoaderLease1.getOrResolveClassLoader(
Collections.emptyList(), Collections.emptyList());
fail("Should fail with an IllegalStateException");
} catch (IllegalStateException e) {
// that's what we want
}
final UserCodeClassLoader recreatedClassLoader1 =
classLoaderLease1.getOrResolveClassLoader(
Collections.emptyList(), Collections.emptyList());
assertThat(recreatedClassLoader1, not(sameInstance(classLoader1)));

try {
classLoaderLease1.getOrResolveClassLoader(
keys, Collections.singletonList(new URL("file:///tmp/does-not-exist")));
fail("Should fail with an IllegalStateException");
} catch (IllegalStateException e) {
// that's what we want
}
final UserCodeClassLoader recreatedClassLoader2 =
classLoaderLease1.getOrResolveClassLoader(
keys, Collections.singletonList(new URL("file:///tmp/does-not-exist")));
assertThat(recreatedClassLoader2, not(sameInstance(recreatedClassLoader1)));

assertEquals(1, libCache.getNumberOfManagedJobs());
assertEquals(2, libCache.getNumberOfReferenceHolders(jobId));
Expand Down Expand Up @@ -613,6 +606,94 @@ public void releaseUserCodeClassLoader_willRegisterOnce()
releaseHookLatch.await();
}

@Test
public void classloaderIsRecreatedWhenBlobKeysChangeForSameJob() throws Exception {
JobID jobId = new JobID();
BlobServer server = null;
PermanentBlobCache cache = null;
BlobLibraryCacheManager libCache = null;

final byte[] content = new byte[] {1, 2, 3, 4, 5, 6, 7, 8};

try {
Configuration config = new Configuration();
config.set(BlobServerOptions.CLEANUP_INTERVAL, 1_000_000L);

server = new BlobServer(config, temporaryFolder.newFolder(), new VoidBlobStore());
server.start();
InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort());
cache =
new PermanentBlobCache(
config,
temporaryFolder.newFolder(),
new VoidBlobStore(),
serverAddress);

PermanentBlobKey key1 = server.putPermanent(jobId, content);
PermanentBlobKey key2 = server.putPermanent(jobId, content);

libCache = createBlobLibraryCacheManager(cache);
cache.registerJob(jobId);

final LibraryCacheManager.ClassLoaderLease lease =
libCache.registerClassLoaderLease(jobId);
final UserCodeClassLoader classLoader1 =
lease.getOrResolveClassLoader(
Collections.singletonList(key1), Collections.emptyList());

final UserCodeClassLoader classLoader2 =
lease.getOrResolveClassLoader(
Collections.singletonList(key2), Collections.emptyList());

assertThat(classLoader2, not(sameInstance(classLoader1)));

lease.release();
} finally {
if (libCache != null) {
libCache.shutdown();
}
if (cache != null) {
cache.close();
}
if (server != null) {
server.close();
}
}
}

@Test
public void classloaderRecreationDoesNotCloseOldClassloader() throws Exception {
assumeFalse(wrapsSystemClassLoader);
final TestingClassLoader classLoader1 = new TestingClassLoader();
final TestingClassLoader classLoader2 = new TestingClassLoader();
final TestingClassLoader[] classLoaders = {classLoader1, classLoader2};
final int[] callCount = {0};

final BlobLibraryCacheManager libraryCacheManager =
new TestingBlobLibraryCacheManagerBuilder()
.setClassLoaderFactory(
ignored -> {
int idx = callCount[0]++;
return classLoaders[Math.min(idx, classLoaders.length - 1)];
})
.build();

final JobID jobId = new JobID();
final LibraryCacheManager.ClassLoaderLease lease =
libraryCacheManager.registerClassLoaderLease(jobId);

lease.getOrResolveClassLoader(Collections.emptyList(), Collections.emptyList());

lease.getOrResolveClassLoader(
Collections.emptyList(),
Collections.singletonList(new URL("file:///tmp/does-not-exist")));

assertFalse(
"Old classloader should not be closed during re-creation", classLoader1.isClosed());

lease.release();
}

private BlobLibraryCacheManager createSimpleBlobLibraryCacheManager() throws IOException {
return new TestingBlobLibraryCacheManagerBuilder().build();
}
Expand Down