Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.zeppelin.common;

import java.time.Duration;

/**
* Utility for parsing time values from configuration strings.
* Consolidates the logic previously duplicated in
* {@code ZeppelinConfiguration.timeUnitToMill()} and
* {@code TimeoutLifecycleManager.parseTimeValue()}.
*/
public final class ConfigTimeUtils {
private ConfigTimeUtils() {}

/**
* Parses a configuration time string to milliseconds.
*
* <p>Supported formats:
* <ul>
* <li>Plain integer: treated as milliseconds (e.g. {@code "60000"})</li>
* <li>{@code "ms"} suffix: parsed as milliseconds (e.g. {@code "500ms"})</li>
* <li>ISO 8601 duration component: H, M, S or combinations (e.g. {@code "1H"}, {@code "30M"},
* {@code "1H30M"})</li>
* </ul>
*
* @param value the time string from a configuration property
* @return the equivalent duration in milliseconds
* @throws IllegalArgumentException if {@code value} is null or empty
* @throws java.time.format.DateTimeParseException if the value is not a recognised format
*/
public static long parseTimeValueToMillis(String value) {
if (value == null || value.trim().isEmpty()) {
throw new IllegalArgumentException("Time value must not be null or empty");
}
try {
return Long.parseLong(value);
} catch (NumberFormatException e) {
if (value.endsWith("ms")) {
return Long.parseLong(value.substring(0, value.length() - 2));
}
return Duration.parse("PT" + value).toMillis();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.zeppelin.common;

/**
* Canonical string constants for interpreter-related configuration keys.
* Use these instead of plain string literals to catch typos at compile time.
*/
public final class InterpreterConfigKeys {
private InterpreterConfigKeys() {}

public static final String INTERPRETER_CONNECTION_POOL_SIZE =
"zeppelin.interpreter.connection.poolsize";
public static final String INTERPRETER_LIFECYCLE_MANAGER_CLASS =
"zeppelin.interpreter.lifecyclemanager.class";
public static final String INTERPRETER_LIFECYCLE_MANAGER_TIMEOUT_CHECK_INTERVAL =
"zeppelin.interpreter.lifecyclemanager.timeout.checkinterval";
public static final String INTERPRETER_LIFECYCLE_MANAGER_TIMEOUT_THRESHOLD =
"zeppelin.interpreter.lifecyclemanager.timeout.threshold";
public static final String PROXY_URL = "zeppelin.proxy.url";
public static final String PROXY_USER = "zeppelin.proxy.user";
public static final String PROXY_PASSWORD = "zeppelin.proxy.password";
public static final String INTERPRETER_DEP_MVNREPO = "zeppelin.interpreter.dep.mvnRepo";
public static final String SCHEDULER_THREADPOOL_SIZE = "zeppelin.scheduler.threadpool.size";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.zeppelin.common;

import org.junit.jupiter.api.Test;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;

class ConfigTimeUtilsTest {

@Test
void plainNumberReturnedAsMillis() {
assertEquals(60000L, ConfigTimeUtils.parseTimeValueToMillis("60000"));
}

@Test
void msSuffixParsed() {
assertEquals(500L, ConfigTimeUtils.parseTimeValueToMillis("500ms"));
}

@Test
void hourUnitParsed() {
assertEquals(3600000L, ConfigTimeUtils.parseTimeValueToMillis("1H"));
}

@Test
void minuteUnitParsed() {
assertEquals(1800000L, ConfigTimeUtils.parseTimeValueToMillis("30M"));
}

@Test
void secondUnitParsed() {
assertEquals(10000L, ConfigTimeUtils.parseTimeValueToMillis("10S"));
}

@Test
void compoundDurationParsed() {
assertEquals(5400000L, ConfigTimeUtils.parseTimeValueToMillis("1H30M"));
}

@Test
void defaultCheckIntervalCompatible() {
assertEquals(60000L, ConfigTimeUtils.parseTimeValueToMillis("60000"));
}

@Test
void defaultThresholdCompatible() {
assertEquals(3600000L, ConfigTimeUtils.parseTimeValueToMillis("3600000"));
}

@Test
void nullInputThrows() {
assertThrows(IllegalArgumentException.class,
() -> ConfigTimeUtils.parseTimeValueToMillis(null));
}

@Test
void emptyInputThrows() {
assertThrows(IllegalArgumentException.class,
() -> ConfigTimeUtils.parseTimeValueToMillis(""));
}

@Test
void invalidFormatThrows() {
assertThrows(Exception.class,
() -> ConfigTimeUtils.parseTimeValueToMillis("abc"));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.zeppelin.common;

import org.junit.jupiter.api.Test;

import static org.junit.jupiter.api.Assertions.assertEquals;

class InterpreterConfigKeysTest {

@Test
void interpreterConnectionPoolSizeKey() {
assertEquals("zeppelin.interpreter.connection.poolsize",
InterpreterConfigKeys.INTERPRETER_CONNECTION_POOL_SIZE);
}

@Test
void interpreterLifecycleManagerClassKey() {
assertEquals("zeppelin.interpreter.lifecyclemanager.class",
InterpreterConfigKeys.INTERPRETER_LIFECYCLE_MANAGER_CLASS);
}

@Test
void interpreterLifecycleManagerTimeoutCheckIntervalKey() {
assertEquals("zeppelin.interpreter.lifecyclemanager.timeout.checkinterval",
InterpreterConfigKeys.INTERPRETER_LIFECYCLE_MANAGER_TIMEOUT_CHECK_INTERVAL);
}

@Test
void interpreterLifecycleManagerTimeoutThresholdKey() {
assertEquals("zeppelin.interpreter.lifecyclemanager.timeout.threshold",
InterpreterConfigKeys.INTERPRETER_LIFECYCLE_MANAGER_TIMEOUT_THRESHOLD);
}

@Test
void proxyUrlKey() {
assertEquals("zeppelin.proxy.url", InterpreterConfigKeys.PROXY_URL);
}

@Test
void proxyUserKey() {
assertEquals("zeppelin.proxy.user", InterpreterConfigKeys.PROXY_USER);
}

@Test
void proxyPasswordKey() {
assertEquals("zeppelin.proxy.password", InterpreterConfigKeys.PROXY_PASSWORD);
}

@Test
void interpreterDepMvnRepoKey() {
assertEquals("zeppelin.interpreter.dep.mvnRepo",
InterpreterConfigKeys.INTERPRETER_DEP_MVNREPO);
}

@Test
void schedulerThreadpoolSizeKey() {
assertEquals("zeppelin.scheduler.threadpool.size",
InterpreterConfigKeys.SCHEDULER_THREADPOOL_SIZE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@
package org.apache.zeppelin.interpreter.lifecycle;

import org.apache.thrift.TException;
import org.apache.zeppelin.common.ConfigTimeUtils;
import org.apache.zeppelin.common.InterpreterConfigKeys;
import org.apache.zeppelin.interpreter.LifecycleManager;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer;
import org.apache.zeppelin.scheduler.ExecutorFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.Properties;
import java.util.concurrent.ScheduledExecutorService;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
Expand All @@ -50,11 +51,11 @@ public class TimeoutLifecycleManager extends LifecycleManager {
public TimeoutLifecycleManager(Properties properties,
RemoteInterpreterServer remoteInterpreterServer) {
super(properties, remoteInterpreterServer);
long checkInterval = parseTimeValue(properties.getProperty(
"zeppelin.interpreter.lifecyclemanager.timeout.checkinterval",
String.valueOf(DEFAULT_CHECK_INTERVAL)));
long timeoutThreshold = parseTimeValue(properties.getProperty(
"zeppelin.interpreter.lifecyclemanager.timeout.threshold",
long checkInterval = ConfigTimeUtils.parseTimeValueToMillis(properties.getProperty(
InterpreterConfigKeys.INTERPRETER_LIFECYCLE_MANAGER_TIMEOUT_CHECK_INTERVAL,
String.valueOf(DEFAULT_CHECK_INTERVAL)));
long timeoutThreshold = ConfigTimeUtils.parseTimeValueToMillis(properties.getProperty(
InterpreterConfigKeys.INTERPRETER_LIFECYCLE_MANAGER_TIMEOUT_THRESHOLD,
String.valueOf(DEFAULT_TIMEOUT_THRESHOLD)));
ScheduledExecutorService checkScheduler = ExecutorFactory.singleton()
.createOrGetScheduled("TimeoutLifecycleManager", 1);
Expand All @@ -74,17 +75,6 @@ public TimeoutLifecycleManager(Properties properties,
timeoutThreshold);
}

static long parseTimeValue(String value) {
try {
return Long.parseLong(value);
} catch (NumberFormatException e) {
if (value.endsWith("ms")) {
return Long.parseLong(value.substring(0, value.length() - 2));
}
return Duration.parse("PT" + value).toMillis();
}
}

@Override
public void onInterpreterProcessStarted(String interpreterGroupId) {
LOGGER.info("Interpreter process: {} is started", interpreterGroupId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.zeppelin.helium.ApplicationLoader;
import org.apache.zeppelin.helium.HeliumAppAngularObjectRegistry;
import org.apache.zeppelin.helium.HeliumPackage;
import org.apache.zeppelin.common.InterpreterConfigKeys;
import org.apache.zeppelin.interpreter.Constants;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
Expand Down Expand Up @@ -211,7 +212,7 @@ public void init(Map<String, String> properties) throws InterpreterRPCException,

if (!isTest) {
int connectionPoolSize = Integer.parseInt(
zProperties.getProperty("zeppelin.interpreter.connection.poolsize", "100"));
zProperties.getProperty(InterpreterConfigKeys.INTERPRETER_CONNECTION_POOL_SIZE, "100"));
LOGGER.info("Creating RemoteInterpreterEventClient with connection pool size: {}",
connectionPoolSize);
intpEventClient = new RemoteInterpreterEventClient(intpEventServerHost, intpEventServerPort,
Expand Down Expand Up @@ -268,8 +269,8 @@ public boolean isRunning() {

private LifecycleManager createLifecycleManager() throws Exception {
String lifecycleManagerClass = zProperties.getProperty(
"zeppelin.interpreter.lifecyclemanager.class",
"org.apache.zeppelin.interpreter.lifecycle.NullLifecycleManager");
InterpreterConfigKeys.INTERPRETER_LIFECYCLE_MANAGER_CLASS,
"org.apache.zeppelin.interpreter.lifecycle.NullLifecycleManager");
Class<?> clazz = Class.forName(lifecycleManagerClass);
LOGGER.info("Creating interpreter lifecycle manager: {}", lifecycleManagerClass);
return (LifecycleManager) clazz.getConstructor(Properties.class, RemoteInterpreterServer.class)
Expand Down Expand Up @@ -335,10 +336,10 @@ public void createInterpreter(String interpreterGroupId, String sessionId, Strin
}

depLoader = new DependencyResolver(localRepoPath,
zProperties.getProperty("zeppelin.proxy.url"),
zProperties.getProperty("zeppelin.proxy.user"),
zProperties.getProperty("zeppelin.proxy.password"),
zProperties.getProperty("zeppelin.interpreter.dep.mvnRepo"));
zProperties.getProperty(InterpreterConfigKeys.PROXY_URL),
zProperties.getProperty(InterpreterConfigKeys.PROXY_USER),
zProperties.getProperty(InterpreterConfigKeys.PROXY_PASSWORD),
zProperties.getProperty(InterpreterConfigKeys.INTERPRETER_DEP_MVNREPO));
appLoader = new ApplicationLoader(resourcePool, depLoader);

resultCacheInSeconds =
Expand Down Expand Up @@ -486,7 +487,7 @@ public void reconnect(String host, int port) throws InterpreterRPCException, TEx
this.intpEventServerHost = host;
this.intpEventServerPort = port;
intpEventClient = new RemoteInterpreterEventClient(intpEventServerHost, intpEventServerPort,
Integer.parseInt(zProperties.getProperty("zeppelin.interpreter.connection.poolsize", "100")));
Integer.parseInt(zProperties.getProperty(InterpreterConfigKeys.INTERPRETER_CONNECTION_POOL_SIZE, "100")));
intpEventClient.setIntpGroupId(interpreterGroupId);

this.angularObjectRegistry = new AngularObjectRegistry(interpreterGroup.getId(), intpEventClient);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.zeppelin.scheduler;

import org.apache.zeppelin.common.InterpreterConfigKeys;
import org.apache.zeppelin.util.ExecutorUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -59,7 +60,7 @@ private static int getSchedulerPoolSize() {
if (envValue != null) {
return Integer.parseInt(envValue);
}
String propValue = System.getProperty("zeppelin.scheduler.threadpool.size");
String propValue = System.getProperty(InterpreterConfigKeys.SCHEDULER_THREADPOOL_SIZE);
if (propValue != null) {
return Integer.parseInt(propValue);
}
Expand Down
Loading
Loading