Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ public class QueryContext {
/** The key of the request id in the context map. */
private static final String REQUEST_ID_KEY = "request_id";

private static final String PROFILE_KEY = "profile";

/**
* Generates a random UUID and adds to the {@link ThreadContext} as the request id.
*
Expand Down Expand Up @@ -66,4 +68,20 @@ private QueryContext() {
throw new AssertionError(
getClass().getCanonicalName() + " is a utility class and must not be initialized");
}

/**
* Store the profile flag in thread context.
*
* @param profileEnabled whether profiling is enabled
*/
public static void setProfile(boolean profileEnabled) {
ThreadContext.put(PROFILE_KEY, Boolean.toString(profileEnabled));
}

/**
* @return true if profiling flag is set in the thread context.
*/
public static boolean isProfileEnabled() {
return Boolean.parseBoolean(ThreadContext.get(PROFILE_KEY));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
package org.opensearch.sql.calcite.utils;

import static java.util.Objects.requireNonNull;
import static org.opensearch.sql.monitor.profile.MetricName.OPTIMIZE_TIME;

import com.google.common.collect.ImmutableList;
import java.lang.reflect.Type;
Expand Down Expand Up @@ -91,6 +92,8 @@
import org.opensearch.sql.calcite.plan.OpenSearchRules;
import org.opensearch.sql.calcite.plan.Scannable;
import org.opensearch.sql.expression.function.PPLBuiltinOperators;
import org.opensearch.sql.monitor.profile.ProfileMetric;
import org.opensearch.sql.monitor.profile.QueryProfiling;

/**
* Calcite Tools Helper. This class is used to create customized: 1. Connection 2. JavaTypeFactory
Expand Down Expand Up @@ -340,6 +343,8 @@ public static class OpenSearchRelRunners {
* org.apache.calcite.tools.RelRunners#run(RelNode)}
*/
public static PreparedStatement run(CalcitePlanContext context, RelNode rel) {
ProfileMetric optimizeTime = QueryProfiling.current().getOrCreateMetric(OPTIMIZE_TIME);
long startTime = System.nanoTime();
final RelShuttle shuttle =
new RelHomogeneousShuttle() {
@Override
Expand All @@ -358,7 +363,9 @@ public RelNode visit(TableScan scan) {
// the line we changed here
try (Connection connection = context.connection) {
final RelRunner runner = connection.unwrap(RelRunner.class);
return runner.prepareStatement(rel);
PreparedStatement preparedStatement = runner.prepareStatement(rel);
optimizeTime.set(System.nanoTime() - startTime);
return preparedStatement;
} catch (SQLException e) {
// Detect if error is due to window functions in unsupported context (bins on time fields)
String errorMsg = e.getMessage();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,14 @@
import org.opensearch.sql.calcite.plan.LogicalSystemLimit.SystemLimitType;
import org.opensearch.sql.common.response.ResponseListener;
import org.opensearch.sql.common.setting.Settings;
import org.opensearch.sql.common.utils.QueryContext;
import org.opensearch.sql.datasource.DataSourceService;
import org.opensearch.sql.exception.CalciteUnsupportedException;
import org.opensearch.sql.exception.NonFallbackCalciteException;
import org.opensearch.sql.monitor.profile.MetricName;
import org.opensearch.sql.monitor.profile.ProfileContext;
import org.opensearch.sql.monitor.profile.ProfileMetric;
import org.opensearch.sql.monitor.profile.QueryProfiling;
import org.opensearch.sql.planner.PlanContext;
import org.opensearch.sql.planner.Planner;
import org.opensearch.sql.planner.logical.LogicalPaginate;
Expand Down Expand Up @@ -98,13 +103,18 @@ public void executeWithCalcite(
CalcitePlanContext.run(
() -> {
try {
ProfileContext profileContext =
QueryProfiling.activate(QueryContext.isProfileEnabled());
ProfileMetric metric = profileContext.getOrCreateMetric(MetricName.ANALYZE_TIME);
long analyzeStart = System.nanoTime();
CalcitePlanContext context =
CalcitePlanContext.create(
buildFrameworkConfig(), SysLimit.fromSettings(settings), queryType);
RelNode relNode = analyze(plan, context);
relNode = mergeAdjacentFilters(relNode);
RelNode optimized = optimize(relNode, context);
RelNode calcitePlan = convertToCalcitePlan(optimized);
metric.set(System.nanoTime() - analyzeStart);
executionEngine.execute(calcitePlan, context, listener);
} catch (Throwable t) {
if (isCalciteFallbackAllowed(t) && !(t instanceof NonFallbackCalciteException)) {
Expand Down Expand Up @@ -137,6 +147,7 @@ public void explainWithCalcite(
CalcitePlanContext.run(
() -> {
try {
QueryProfiling.noop();
CalcitePlanContext context =
CalcitePlanContext.create(
buildFrameworkConfig(), SysLimit.fromSettings(settings), queryType);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.monitor.profile;

import java.util.concurrent.atomic.LongAdder;

/** Concrete metric backed by {@link LongAdder}. */
final class DefaultMetricImpl implements ProfileMetric {

private final String name;
private final LongAdder value = new LongAdder();

/**
* Construct a metric with the provided name.
*
* @param name metric name
*/
DefaultMetricImpl(String name) {
this.name = name;
}

@Override
public String name() {
return name;
}

@Override
public long value() {
return value.sum();
}

@Override
public void add(long delta) {
value.add(delta);
}

@Override
public void set(long value) {
this.value.reset();
this.value.add(value);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.monitor.profile;

import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;

/** Default implementation that records profiling metrics. */
public class DefaultProfileContext implements ProfileContext {

private final long startNanos = System.nanoTime();
private long endNanos;
private boolean finished;
private final Map<MetricName, DefaultMetricImpl> metrics = new ConcurrentHashMap<>();
private QueryProfile profile;

public DefaultProfileContext() {}

/** {@inheritDoc} */
@Override
public boolean isEnabled() {
return true;
}

/** {@inheritDoc} */
@Override
public ProfileMetric getOrCreateMetric(MetricName name) {
Objects.requireNonNull(name, "name");
return metrics.computeIfAbsent(name, key -> new DefaultMetricImpl(key.name()));
}

/** {@inheritDoc} */
@Override
public synchronized QueryProfile finish() {
if (finished) {
return profile;
}
finished = true;
endNanos = System.nanoTime();
Map<MetricName, Double> snapshot = new LinkedHashMap<>(MetricName.values().length);
for (MetricName metricName : MetricName.values()) {
DefaultMetricImpl metric = metrics.get(metricName);
double millis = metric == null ? 0d : roundToMillis(metric.value());
snapshot.put(metricName, millis);
}
double totalMillis = roundToMillis(endNanos - startNanos);
profile = new QueryProfile(totalMillis, snapshot);
return profile;
}

private double roundToMillis(long nanos) {
return Math.round((nanos / 1_000_000.0d) * 100.0d) / 100.0d;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.monitor.profile;

/** Named metrics used by query profiling. */
public enum MetricName {
ANALYZE_TIME,
OPTIMIZE_TIME,
OPENSEARCH_TIME,
POST_EXEC_TIME,
FORMAT_TIME
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.monitor.profile;

import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Objects;

/** Disabled profiling context. */
public final class NoopProfileContext implements ProfileContext {

public static final NoopProfileContext INSTANCE = new NoopProfileContext();

private NoopProfileContext() {}

/** {@inheritDoc} */
@Override
public boolean isEnabled() {
return false;
}

/** {@inheritDoc} */
@Override
public ProfileMetric getOrCreateMetric(MetricName name) {
Objects.requireNonNull(name, "name");
return NoopProfileMetric.INSTANCE;
}

/** {@inheritDoc} */
@Override
public QueryProfile finish() {
Map<MetricName, Double> metrics = new LinkedHashMap<>();
for (MetricName metricName : MetricName.values()) {
metrics.put(metricName, 0d);
}
return new QueryProfile(0d, metrics);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.monitor.profile;

/** No-op metric implementation. */
final class NoopProfileMetric implements ProfileMetric {

static final NoopProfileMetric INSTANCE = new NoopProfileMetric();

private NoopProfileMetric() {}

/** {@inheritDoc} */
@Override
public String name() {
return "";
}

/** {@inheritDoc} */
@Override
public long value() {
return 0;
}

/** {@inheritDoc} */
@Override
public void add(long delta) {}

/** {@inheritDoc} */
@Override
public void set(long value) {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.monitor.profile;

/** Context for collecting profiling metrics during query execution. */
public interface ProfileContext extends AutoCloseable {
/**
* @return true when profiling is enabled.
*/
boolean isEnabled();

/**
* Obtain or create a metric with the provided name.
*
* @param name fully qualified metric name
* @return metric instance
*/
ProfileMetric getOrCreateMetric(MetricName name);

/**
* Finalize profiling and return a snapshot.
*
* @return immutable query profile snapshot
*/
QueryProfile finish();

@Override
default void close() {
finish();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.monitor.profile;

/** Metric for query profiling. */
public interface ProfileMetric {
/**
* @return metric name.
*/
String name();

/**
* @return current metric value.
*/
long value();

/**
* Increment the metric by the given delta.
*
* @param delta amount to add
*/
void add(long delta);

/**
* Set the metric to the provided value.
*
* @param value new metric value
*/
void set(long value);
}
Loading
Loading