Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 61 additions & 4 deletions hls4ml/model/profiling.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,20 @@ def activation_types_hlsmodel(model):
data['layer'].append('model')
data['low'].append(-F)
data['high'].append(I - 1 if S else I)

# Input variable precisions (so the type overlay covers inputs)
input_var_names = set()
for input_var in model.get_input_variables():
T = input_var.type.precision
W, I, F, S = ap_fixed_WIFS(T)
data['layer'].append(input_var.name)
data['low'].append(-F)
data['high'].append(I - 1 if S else I)
input_var_names.add(input_var.name)

for layer in model.get_layers():
if layer.name in input_var_names:
continue
T = layer.get_output_variable().type.precision
W, I, F, S = ap_fixed_WIFS(T)
data['layer'].append(layer.name)
Expand Down Expand Up @@ -321,13 +334,50 @@ def _keras_lstm(layer):
)


def _normalize_input_data(X, input_names):
"""Return list of (name, ndarray) pairs from X, respecting input_names order."""
if isinstance(X, dict):
return [(n, np.asarray(X[n])) for n in input_names if n in X]
if isinstance(X, (list, tuple)):
return [(n, np.asarray(a)) for n, a in zip(input_names, X)]
return [(input_names[0], np.asarray(X))]


def _add_input_distributions(data, X, input_names, fmt='longform', plot='boxplot'):
"""Prepend input-data distribution entries to *data* (modified in place)."""
for name, arr in _normalize_input_data(X, input_names):
print(f' {name}')
y = arr.flatten()
y = abs(y[y != 0])
if len(y) == 0:
print(f'Input data for {name} contains only zeros, ignoring.')
continue
if fmt == 'longform':
data['x'].extend(y.tolist())
data['weight'].extend([name] * len(y))
elif fmt == 'summary':
data.append(array_to_summary(y, fmt=plot))
data[-1]['weight'] = name


def activations_hlsmodel(model, X, fmt='summary', plot='boxplot'):
if fmt == 'longform':
raise NotImplementedError
elif fmt == 'summary':
data = []

_, trace = model.trace(np.ascontiguousarray(X))
input_vars = model.get_input_variables()
input_names = [var.name for var in input_vars]
_add_input_distributions(data, X, input_names, fmt=fmt, plot=plot)

if isinstance(X, (list, tuple)):
trace_input = [np.ascontiguousarray(xi) for xi in X]
elif isinstance(X, dict):
trace_input = [np.ascontiguousarray(X[name]) for name in input_names]
else:
trace_input = np.ascontiguousarray(X)

_, trace = model.trace(trace_input)

if len(trace) == 0:
raise RuntimeError('ModelGraph must have tracing on for at least 1 layer (this can be set in its config)')
Expand Down Expand Up @@ -390,6 +440,11 @@ def activations_keras(model, X, fmt='longform', plot='boxplot'):
# return summary statistics for matplotlib.axes.Axes.bxp
# or histogram bin edges and heights
data = []

input_names = [layer.name for layer in model.layers if isinstance(layer, keras.layers.InputLayer)]
if input_names:
_add_input_distributions(data, X, input_names, fmt=fmt, plot=plot)

outputs = _get_outputs(
[layer for layer in model.layers if not isinstance(layer, keras.layers.InputLayer)], X, model.input
)
Expand All @@ -403,7 +458,7 @@ def activations_keras(model, X, fmt='longform', plot='boxplot'):
continue
if fmt == 'longform':
data['x'].extend(y.tolist())
data['weight'].extend([layer_name for i in range(len(y))])
data['weight'].extend([layer_name] * len(y))
elif fmt == 'summary':
data.append(array_to_summary(y, fmt=plot))
data[-1]['weight'] = layer_name
Expand All @@ -421,12 +476,14 @@ def weights_torch(model, fmt='longform', plot='boxplot'):


def activations_torch(model, X, fmt='longform', plot='boxplot'):
X = torch.Tensor(X)
if fmt == 'longform':
data = {'x': [], 'weight': []}
elif fmt == 'summary':
data = []

_add_input_distributions(data, X, ['input'], fmt=fmt, plot=plot)

X = torch.Tensor(X)
partial_model = torch.nn.Sequential
layers = []
for layer in model.children():
Expand All @@ -441,7 +498,7 @@ def activations_torch(model, X, fmt='longform', plot='boxplot'):
continue
if fmt == 'longform':
data['x'].extend(y.tolist())
data['weight'].extend([lname for _ in range(len(y))])
data['weight'].extend([lname] * len(y))
elif fmt == 'summary':
data.append(array_to_summary(y, fmt=plot))
data[-1]['weight'] = lname
Expand Down
140 changes: 140 additions & 0 deletions test/pytest/test_profiling_input_layer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
"""Tests for input layer inclusion in profiling plots (Issue #404).

Verifies that profiling activations include the distribution of input data
and that activation type overlays cover input variable precisions.
"""

import numpy as np
import pytest

keras = pytest.importorskip('keras')

import hls4ml # noqa: E402
from hls4ml.model.profiling import ( # noqa: E402
_normalize_input_data,
activation_types_hlsmodel,
activations_keras,
)

# ---------------------------------------------------------------------------
# Tests for _normalize_input_data helper
# ---------------------------------------------------------------------------


class TestNormalizeInputData:
"""Tests for the private _normalize_input_data helper."""

def test_single_ndarray(self):
X = np.random.rand(10, 5)
result = _normalize_input_data(X, ['my_input'])
assert len(result) == 1
assert result[0][0] == 'my_input'
np.testing.assert_array_equal(result[0][1], X)

def test_list_of_ndarrays(self):
a, b = np.random.rand(10, 5), np.random.rand(10, 3)
result = _normalize_input_data([a, b], ['in1', 'in2'])
assert len(result) == 2
assert result[0][0] == 'in1'
assert result[1][0] == 'in2'
np.testing.assert_array_equal(result[0][1], a)
np.testing.assert_array_equal(result[1][1], b)

def test_dict_of_ndarrays(self):
a, b = np.random.rand(10, 5), np.random.rand(10, 3)
X = {'in1': a, 'in2': b}
result = _normalize_input_data(X, ['in1', 'in2'])
assert len(result) == 2
assert result[0][0] == 'in1'
assert result[1][0] == 'in2'
np.testing.assert_array_equal(result[0][1], a)
np.testing.assert_array_equal(result[1][1], b)

def test_dict_respects_input_names_order(self):
"""Keys present in input_names but missing from X are skipped."""
a = np.random.rand(10, 5)
result = _normalize_input_data({'in1': a}, ['in1', 'in2'])
assert len(result) == 1
assert result[0][0] == 'in1'


# ---------------------------------------------------------------------------
# Tests for activations_keras including input distribution
# ---------------------------------------------------------------------------


class TestActivationsKerasIncludesInput:
"""activations_keras must prepend the input distribution."""

def test_input_in_summary_format(self):
inputs = keras.Input(shape=(10,), name='fc1_input')
x = keras.layers.Dense(5, name='dense1')(inputs)
model = keras.Model(inputs=inputs, outputs=x)

X = np.random.rand(50, 10).astype(np.float32) + 0.1

data = activations_keras(model, X, fmt='summary', plot='boxplot')
weights = [entry['weight'] for entry in data]

assert 'fc1_input' in weights, 'Input distribution missing from activations'
assert weights[0] == 'fc1_input', 'Input should be the first entry'

def test_input_in_longform_format(self):
inputs = keras.Input(shape=(10,), name='fc1_input')
x = keras.layers.Dense(5, name='dense1')(inputs)
model = keras.Model(inputs=inputs, outputs=x)

X = np.random.rand(50, 10).astype(np.float32) + 0.1

data = activations_keras(model, X, fmt='longform', plot='boxplot')
unique_weights = list(dict.fromkeys(data['weight'].tolist()))

assert 'fc1_input' in unique_weights, 'Input distribution missing from longform activations'
assert unique_weights[0] == 'fc1_input', 'Input should appear first in longform data'


# ---------------------------------------------------------------------------
# Tests for activation_types_hlsmodel including input variable types
# ---------------------------------------------------------------------------


class TestActivationTypesIncludesInput:
"""activation_types_hlsmodel must include input variable precisions."""

def test_input_type_present(self, tmp_path):
inputs = keras.Input(shape=(10,), name='fc1_input')
x = keras.layers.Dense(5, name='dense1')(inputs)
model = keras.Model(inputs=inputs, outputs=x)

config = hls4ml.utils.config_from_keras_model(model, granularity='name')
hls_model = hls4ml.converters.convert_from_keras_model(
model,
hls_config=config,
output_dir=str(tmp_path / 'hls4ml_prj'),
backend='Vivado',
)

types_df = activation_types_hlsmodel(hls_model)

assert 'fc1_input' in types_df['layer'].values, 'Input variable type missing from activation_types_hlsmodel'

def test_input_type_has_valid_range(self, tmp_path):
"""The input type row should have finite low/high bounds."""
inputs = keras.Input(shape=(10,), name='fc1_input')
x = keras.layers.Dense(5, name='dense1')(inputs)
model = keras.Model(inputs=inputs, outputs=x)

config = hls4ml.utils.config_from_keras_model(model, granularity='name')
hls_model = hls4ml.converters.convert_from_keras_model(
model,
hls_config=config,
output_dir=str(tmp_path / 'hls4ml_prj'),
backend='Vivado',
)

types_df = activation_types_hlsmodel(hls_model)
input_row = types_df[types_df['layer'] == 'fc1_input']

assert len(input_row) == 1, 'Expected exactly one row for fc1_input'
assert np.isfinite(input_row['low'].iloc[0])
assert np.isfinite(input_row['high'].iloc[0])