Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using System.Reactive;
using System.Reactive.Concurrency;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using CodeCasa.AutomationPipelines.Lights.Nodes;
using CodeCasa.Lights;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,5 +166,17 @@ private void SetOutputInternal(LightTransition? output)

/// <inheritdoc />
public override string ToString() => GetType().Name;

/// <inheritdoc />
public ValueTask DisposeAsync()
{
if (_newOutputSubject.IsDisposed)
{
return ValueTask.CompletedTask;
}
_newOutputSubject.OnCompleted();
_newOutputSubject.Dispose();
return ValueTask.CompletedTask;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,6 @@ namespace CodeCasa.AutomationPipelines.Lights.Nodes
internal class ServiceScopedNode<TState>(IServiceScope serviceScope, IPipelineNode<TState> innerNode)
: IPipelineNode<TState>, IAsyncDisposable
{
public async ValueTask DisposeAsync()
{
await serviceScope.DisposeOrDisposeAsync();
await innerNode.DisposeOrDisposeAsync();
}

public TState? Input
{
get => innerNode.Input;
Expand All @@ -22,5 +16,11 @@ public TState? Input
public IObservable<TState?> OnNewOutput => innerNode.OnNewOutput;

public override string? ToString() => $"{innerNode} (scoped)";

public async ValueTask DisposeAsync()
{
await serviceScope.DisposeOrDisposeAsync();
await innerNode.DisposeOrDisposeAsync();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public IAsyncDisposable SetupLightPipeline<TLight>(TLight light,
/// <param name="light">The light to create a pipeline for.</param>
/// <param name="pipelineBuilder">An action to configure the pipeline behavior.</param>
/// <returns>A configured pipeline for controlling the specified light.</returns>
internal IPipeline<LightTransition> CreateLightPipeline<TLight>(TLight light, Action<ILightTransitionPipelineConfigurator<TLight>> pipelineBuilder) where TLight : ILight
public IPipeline<LightTransition> CreateLightPipeline<TLight>(TLight light, Action<ILightTransitionPipelineConfigurator<TLight>> pipelineBuilder) where TLight : ILight
{
return CreateLightPipelines([light], pipelineBuilder)[light.Id];
}
Expand Down Expand Up @@ -79,22 +79,14 @@ internal Dictionary<string, IPipeline<LightTransition>> CreateLightPipelines<TLi
return configurators.ToDictionary(kvp => kvp.Key, kvp =>
{
var conf = kvp.Value;
IPipeline<LightTransition> pipeline;
IPipeline<LightTransition> pipeline = new Pipeline<LightTransition>(
LightTransition.Off(),
conf.Nodes,
conf.Light.ApplyTransition);
if (conf.LoggingEnabled ?? false)
{
pipeline = new Pipeline<LightTransition>(
$"[{conf.Light.Id}] {conf.LogName}",
LightTransition.Off(),
conf.Nodes,
conf.Light.ApplyTransition,
logger);
}
else
{
pipeline = new Pipeline<LightTransition>(
LightTransition.Off(),
conf.Nodes,
conf.Light.ApplyTransition);
var pipelineLogger = new PipelineLogger<LightTransition>(logger, $"[{conf.Light.Id}] {conf.LogName}");
pipeline.Telemetry.Subscribe(t => pipelineLogger.Log(t));
}

return (IPipeline<LightTransition>)new ServiceScopedPipeline<LightTransition>(lightContextScopes[kvp.Key], pipeline);
Expand Down
31 changes: 31 additions & 0 deletions src/CodeCasa.AutomationPipelines.Lights/Pipeline/PipelineLogger.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
using Microsoft.Extensions.Logging;

namespace CodeCasa.AutomationPipelines.Lights.Pipeline
{
internal class PipelineLogger<TState>(ILogger<Pipeline<TState>>? logger, string? name)
{
public void Log(PipelineTelemetry<TState> pipelineTelemetry)
{
if (pipelineTelemetry.SourceNodeIndex == null && pipelineTelemetry.DestinationNodeIndex == null)
{
logger?.LogTrace($"{LogPrefix}Input set to [{pipelineTelemetry.StateValue?.ToString() ?? "NULL"}]. No nodes registered, passing to pipeline output immediately.");
return;
}

if (pipelineTelemetry.SourceNodeIndex == null)
{
logger?.LogTrace($"{LogPrefix}Input set to [{pipelineTelemetry.StateValue?.ToString() ?? "NULL"}]. Passing input to first [Node {pipelineTelemetry.DestinationNodeIndex}] ({pipelineTelemetry.DestinationNodeName}).");
return;
}
if (pipelineTelemetry.DestinationNodeIndex == null)
{
logger?.LogTrace(
$"{LogPrefix}[Node {pipelineTelemetry.SourceNodeIndex}] ({pipelineTelemetry.SourceNodeName}) passed value [{pipelineTelemetry.StateValue?.ToString() ?? "NULL"}] to pipeline output.");
return;
}
logger?.LogTrace($"{LogPrefix}Passing [Node {pipelineTelemetry.SourceNodeIndex}] ({pipelineTelemetry.SourceNodeName}) value [{pipelineTelemetry.StateValue?.ToString() ?? "NULL"}] to [Node {pipelineTelemetry.DestinationNodeIndex}] ({pipelineTelemetry.DestinationNodeName}).");
}

private string LogPrefix => name == null ? "" : $"{name}: ";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,8 @@ public IPipeline<TNode> SetOutputHandler(Action<TNode> action, bool callActionDi
_instance.SetOutputHandler(action, callActionDistinct);
return this;
}

public IReadOnlyCollection<IPipelineNode<TNode>> Nodes => _instance.Nodes;

public IObservable<PipelineTelemetry<TNode>> Telemetry => _instance.Telemetry;
}
10 changes: 10 additions & 0 deletions src/CodeCasa.AutomationPipelines/IPipeline.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,14 @@ public interface IPipeline<TState> : IPipelineNode<TState>, IAsyncDisposable
/// This method can be called at any time during the creation of the pipeline and will be called immediately if the pipeline has already produced an output.
/// </summary>
IPipeline<TState> SetOutputHandler(Action<TState> action, bool callActionDistinct = true);

/// <summary>
/// Gets the collection of nodes registered in the pipeline.
/// </summary>
IReadOnlyCollection<IPipelineNode<TState>> Nodes { get; }

/// <summary>
/// Gets an observable stream of telemetry events that occur during pipeline execution.
/// </summary>
IObservable<PipelineTelemetry<TState>> Telemetry { get; }
}
5 changes: 3 additions & 2 deletions src/CodeCasa.AutomationPipelines/IPipelineNode.cs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
namespace CodeCasa.AutomationPipelines;

namespace CodeCasa.AutomationPipelines;

/// <summary>
/// Represents a node in a pipeline.
/// </summary>
public interface IPipelineNode<TState>
public interface IPipelineNode<TState> : IAsyncDisposable
{
/// <summary>
/// Sets the input state of the node. This will trigger the processing of the input.
Expand Down
Loading