Skip to content

Commit a5a033d

Browse files
committed
Refactor and enhance FlowSynx CSV Plugin
- Added support for a new `read` operation to convert structured data into CSV format. - Refactored `CsvPlugin` to use `IPluginOperation` for modular operation handling. - Introduced `FilterOperation`, `MapOperation`, and `ReadOperation` classes. - Added parameter classes (`FilterParameters`, `MapParameters`, `ReadParameters`) with metadata annotations. - Centralized CSV handling logic in the new `ParseDataHelper` class. - Updated `README.md` with new examples, descriptions, and parameter details. - Removed legacy classes (`InputParameter`, `ICsvOperationHandler`, and its implementations). - Upgraded target framework to `net10.0` and updated dependencies. - Improved code maintainability, extensibility, and usability. #18
1 parent 5d9b6d2 commit a5a033d

17 files changed

Lines changed: 508 additions & 276 deletions

README.md

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# FlowSynx CSV Plugin
1+
## FlowSynx CSV Plugin
22

33
The CSV Plugin is a pre-packaged, plug-and-play integration component for the FlowSynx engine. It enables reading from and writing to CSV files with configurable parameters such as file path, delimiter, headers, and encoding. Designed for FlowSynx’s no-code/low-code automation workflows, this plugin simplifies data extraction and transformation tasks.
44

@@ -15,18 +15,20 @@ The CSV Plugin allows FlowSynx users to:
1515

1616
## Supported Operations
1717

18+
- **read**: Converts a structured object (e.g., rows from a database, in-memory objects) into CSV output using the provided parameters (e.g., delimiter, headers).
1819
- **filter**: Filters rows in the CSV using defined `Filter` conditions. Supports logical operations (`and`, `or`) and common operators like `equals`, `contains`, `startsWith`, `endsWith`, `greaterThan`, and `lessThan`.
1920
- **map**: Maps existing fields in the CSV to a new subset of keys or column arrangement for simplified output.
20-
- **read**: Reads the structured object (e.g., from database) and returns it as a CSV data.
21+
22+
Minimum FlowSynx version: 1.3.0.
2123

2224
## Input Parameters
2325

2426
The plugin accepts the following parameters:
2527

26-
- `Operation` (string): **Required.** The type of operation to perform. Supported values are `filter` and `map`.
27-
- `Data` (string/object): **Required.** The raw CSV string to process.
28+
- `Operation` (string): Required. The type of operation to perform. Supported values are `read`, `filter`, and `map`.
29+
- `Data` (string/object): Required. The input to process. For `filter` and `map`, provide a raw CSV string. For `read`, provide a structured object or rows to convert to CSV.
2830
- `Delimiter` (string): Optional. Defaults to `,`. The character used to separate fields in the CSV.
29-
- `Mappings` (list): **Required for `map` operation.** Defines which fields to include in the output.
31+
- `Mappings` (list): Required for `map` operation. Defines which fields to include in the output.
3032
- `IgnoreBlankLines` (bool): Optional. Specifies whether blank lines in the CSV should be ignored (`true`) or treated as data rows (`false`). Defaults to `true`.
3133
- `HasHeader` (bool): Optional. Indicates if the first row of the CSV contains headers (`true`) or data (`false`). Defaults to `true`.
3234
- `Filters` (object): Optional. Used with the `filter` operation to define filtering criteria.
@@ -35,7 +37,6 @@ The plugin accepts the following parameters:
3537

3638
```json
3739
{
38-
"Operation": "map",
3940
"Data": { ... },
4041
"Mappings": ["LastName", "Email"],
4142
"IgnoreBlankLines": true,
@@ -46,6 +47,19 @@ The plugin accepts the following parameters:
4647

4748
## Operation Examples
4849

50+
### read Operation
51+
52+
**Input Data (object):**
53+
54+
```json
55+
{
56+
"Data": [ /* csv string or structured rows */ ],
57+
"IgnoreBlankLines": true,
58+
"HasHeader": true,
59+
"Delimiter": ","
60+
}
61+
```
62+
4963
### map Operation
5064

5165
**Input Data:**
@@ -61,7 +75,6 @@ CustomerID,FirstName,LastName,Email,Phone,Country
6175
**Input Parameters:**
6276
```json
6377
{
64-
"Operation": "map",
6578
"Data": { ... },
6679
"Mappings": ["LastName", "Email"],
6780
"IgnoreBlankLines": true,
@@ -95,7 +108,6 @@ CustomerID,FirstName,LastName,Email,Phone,Country
95108
**Input Parameters:**
96109
```json
97110
{
98-
"Operation": "filter",
99111
"Data": { ... },
100112
"Filters": {
101113
"Logic": "and",

src/CsvPlugin.cs

Lines changed: 36 additions & 167 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,18 @@
11
using FlowSynx.PluginCore.Helpers;
22
using FlowSynx.PluginCore;
3-
using FlowSynx.Plugins.Csv.Models;
43
using FlowSynx.PluginCore.Extensions;
54
using FlowSynx.Plugins.Csv.Services;
6-
using CsvHelper.Configuration;
7-
using CsvHelper;
8-
using System.Globalization;
9-
using System.Dynamic;
5+
using FlowSynx.Plugins.Csv.Operations;
6+
using FlowSynx.Plugins.Csv.Parameters;
107

118
namespace FlowSynx.Plugins.Csv;
129

13-
public class CsvPlugin: IPlugin
10+
public class CsvPlugin : IPlugin
1411
{
1512
private IPluginLogger? _logger;
1613
private readonly IGuidProvider _guidProvider;
1714
private readonly IReflectionGuard _reflectionGuard;
18-
private CsvPluginSpecifications _csvSenderSpecifications = null!;
15+
private CsvPluginSpecifications? _specifications = null;
1916
private bool _isInitialized;
2017

2118
public CsvPlugin() : this(new GuidProvider(), new DefaultReflectionGuard()) { }
@@ -32,7 +29,7 @@ internal CsvPlugin(IGuidProvider guidProvider, IReflectionGuard reflectionGuard)
3229
Name = "Csv",
3330
CompanyName = "FlowSynx",
3431
Description = Resources.PluginDescription,
35-
Version = new Version(1, 2, 3),
32+
Version = new Version(1, 3, 0),
3633
Category = PluginCategory.Data,
3734
Authors = new List<string> { "FlowSynx" },
3835
Copyright = "© FlowSynx. All rights reserved.",
@@ -41,35 +38,40 @@ internal CsvPlugin(IGuidProvider guidProvider, IReflectionGuard reflectionGuard)
4138
RepositoryUrl = "https://github.com/flowsynx/plugin-csv",
4239
ProjectUrl = "https://flowsynx.io",
4340
Tags = new List<string>() { "flowSynx", "csv", "comma-separated-values", "data", "data-platform" },
44-
MinimumFlowSynxVersion = new Version(1, 1, 1),
41+
MinimumFlowSynxVersion = new Version(1, 3, 0),
4542
};
4643

47-
public PluginSpecifications? Specifications { get; set; }
44+
public IPluginSpecifications? Specifications => _specifications;
4845

49-
public Type SpecificationsType => typeof(CsvPluginSpecifications);
50-
51-
private Dictionary<string, ICsvOperationHandler> OperationMap => new(StringComparer.OrdinalIgnoreCase)
46+
public IReadOnlyCollection<IPluginOperation> SupportedOperations { get; } = new IPluginOperation[]
5247
{
53-
["read"] = new ReadOperationHandler(),
54-
["filter"] = new FilterOperationHandler(),
55-
["map"] = new MapOperationHandler()
48+
new ReadOperation(),
49+
new FilterOperation(),
50+
new MapOperation()
5651
};
5752

58-
public IReadOnlyCollection<string> SupportedOperations => OperationMap.Keys;
59-
60-
public Task Initialize(IPluginLogger logger)
53+
public Task InitializeAsync(IPluginLogger logger, IDictionary<string, object?>? specifications)
6154
{
6255
if (ReflectionHelper.IsCalledViaReflection())
6356
throw new InvalidOperationException(Resources.ReflectionBasedAccessIsNotAllowed);
6457

65-
ArgumentNullException.ThrowIfNull(logger);
66-
_csvSenderSpecifications = Specifications.ToObject<CsvPluginSpecifications>();
67-
_logger = logger;
58+
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
59+
60+
var csvSpecifications = new CsvPluginSpecifications();
61+
if (specifications != null)
62+
csvSpecifications.FromDictionary(specifications);
63+
64+
csvSpecifications.Validate();
65+
_specifications = csvSpecifications;
66+
6867
_isInitialized = true;
6968
return Task.CompletedTask;
7069
}
7170

72-
public async Task<object?> ExecuteAsync(PluginParameters parameters, CancellationToken cancellationToken)
71+
public async Task<object?> ExecuteAsync(
72+
string? operationName,
73+
PluginParameters parameters,
74+
CancellationToken cancellationToken)
7375
{
7476
cancellationToken.ThrowIfCancellationRequested();
7577

@@ -79,155 +81,22 @@ public Task Initialize(IPluginLogger logger)
7981
if (!_isInitialized)
8082
throw new InvalidOperationException($"Plugin '{Metadata.Name}' v{Metadata.Version} is not initialized.");
8183

82-
var inputParameter = parameters.ToObject<InputParameter>();
83-
if (!OperationMap.TryGetValue(inputParameter.Operation, out var handler))
84-
{
85-
throw new NotSupportedException($"Operation '{inputParameter.Operation}' is not supported.");
86-
}
87-
88-
var context = ParseDataToContext(inputParameter.Data);
89-
var csv = ReadDataFromPluginContext(context, inputParameter);
90-
91-
using var reader = new StringReader(csv);
92-
using var csvReader = new CsvReader(reader, new CsvConfiguration(CultureInfo.InvariantCulture)
93-
{
94-
Delimiter = inputParameter.Delimiter ?? ",",
95-
IgnoreBlankLines = inputParameter.IgnoreBlankLines ?? true,
96-
HasHeaderRecord = inputParameter.HasHeader ?? true,
97-
TrimOptions = TrimOptions.Trim,
98-
DetectColumnCountChanges = true,
99-
BadDataFound = null
100-
});
101-
102-
var records = csvReader.GetRecords<dynamic>().Select(row =>
103-
{
104-
var expando = new ExpandoObject() as IDictionary<string, object?>;
105-
foreach (var kvp in (IDictionary<string, object?>)row)
106-
{
107-
expando[kvp.Key] = kvp.Value;
108-
}
109-
return (ExpandoObject)expando;
110-
}).ToList();
111-
112-
var result = handler.Handle(records, inputParameter);
113-
var csvString = await ToCsvStringAsync(result, inputParameter);
114-
115-
var structuredData = result
116-
.Select(expando => ((IDictionary<string, object?>)expando)
117-
.ToDictionary(kvp => kvp.Key, kvp => kvp.Value as object))
118-
.ToList();
119-
120-
string filename = $"{_guidProvider.NewGuid()}.csv";
121-
return new PluginContext(filename, "Data")
122-
{
123-
Format = "Csv",
124-
Content = csvString,
125-
StructuredData = structuredData
126-
};
127-
}
128-
129-
private PluginContext ParseDataToContext(object? data)
130-
{
131-
if (data is null)
132-
throw new ArgumentNullException(nameof(data), "Input data cannot be null.");
84+
var operation = SupportedOperations
85+
.FirstOrDefault(op => string.Equals(op.Name, operationName, StringComparison.OrdinalIgnoreCase))
86+
?? throw new NotSupportedException($"Operation '{operationName}' is not supported.");
13387

134-
return data switch
88+
return operation.Name.ToLowerInvariant() switch
13589
{
136-
PluginContext singleContext => singleContext,
137-
IEnumerable<PluginContext> => throw new NotSupportedException("List of PluginContext is not supported."),
138-
string strData => new PluginContext(_guidProvider.NewGuid().ToString(), "Data") { Content = strData },
139-
_ => throw new NotSupportedException("Unsupported input data format.")
140-
};
141-
}
90+
"read" => await ((ReadOperation)operation)
91+
.ExecuteAsync(parameters.ToObject<ReadParameters>(), cancellationToken),
14292

143-
private string ReadDataFromPluginContext(PluginContext pluginContext, InputParameter inputParameter)
144-
{
145-
if (pluginContext.Content is not null)
146-
return pluginContext.Content;
147-
else if (pluginContext.StructuredData is not null)
148-
return StructuredDataToCsv(pluginContext.StructuredData, inputParameter.Delimiter, inputParameter.HasHeader);
149-
else
150-
throw new InvalidDataException(string.Format(Resources.TheEnteredDataIsInvalid, pluginContext.Id));
151-
}
93+
"filter" => await ((FilterOperation)operation)
94+
.ExecuteAsync(parameters.ToObject<FilterParameters>(), cancellationToken),
15295

153-
private string StructuredDataToCsv(List<Dictionary<string, object>>? data, string? delimiter = ",", bool? hasHeader = true)
154-
{
155-
if (data == null || data.Count == 0)
156-
return string.Empty;
96+
"map" => await ((MapOperation)operation)
97+
.ExecuteAsync(parameters.ToObject<MapParameters>(), cancellationToken),
15798

158-
using var writer = new StringWriter();
159-
var config = new CsvConfiguration(CultureInfo.InvariantCulture)
160-
{
161-
Delimiter = delimiter ?? ",",
162-
HasHeaderRecord = hasHeader ?? true,
163-
TrimOptions = TrimOptions.Trim,
164-
DetectColumnCountChanges = true,
165-
BadDataFound = null
99+
_ => throw new InvalidOperationException($"Unsupported operation: {operation.Name}")
166100
};
167-
168-
using var csv = new CsvWriter(writer, config);
169-
170-
// Get all unique headers
171-
var headers = data.SelectMany(d => d.Keys).Distinct().ToList();
172-
173-
// Write headers
174-
foreach (var header in headers)
175-
{
176-
csv.WriteField(header);
177-
}
178-
csv.NextRecord();
179-
180-
// Write rows
181-
foreach (var row in data)
182-
{
183-
foreach (var header in headers)
184-
{
185-
row.TryGetValue(header, out var value);
186-
csv.WriteField(value);
187-
}
188-
csv.NextRecord();
189-
}
190-
191-
return writer.ToString();
192-
}
193-
194-
private async Task<string> ToCsvStringAsync(IEnumerable<ExpandoObject> records, InputParameter inputParameter)
195-
{
196-
using var writer = new StringWriter();
197-
using var csvWriter = new CsvWriter(writer, new CsvConfiguration(CultureInfo.InvariantCulture)
198-
{
199-
Delimiter = inputParameter.Delimiter ?? ",",
200-
IgnoreBlankLines = inputParameter.IgnoreBlankLines ?? true,
201-
HasHeaderRecord = inputParameter.HasHeader ?? true,
202-
TrimOptions = TrimOptions.Trim,
203-
DetectColumnCountChanges = true,
204-
BadDataFound = null
205-
});
206-
207-
// Write header
208-
var firstRecord = records.FirstOrDefault();
209-
if (firstRecord is not null)
210-
{
211-
var headerRow = ((IDictionary<string, object?>)firstRecord).Keys;
212-
foreach (var header in headerRow)
213-
{
214-
csvWriter.WriteField(header);
215-
}
216-
await csvWriter.NextRecordAsync();
217-
218-
// Write rows
219-
foreach (var record in records)
220-
{
221-
var values = (IDictionary<string, object?>)record;
222-
foreach (var value in values.Values)
223-
{
224-
csvWriter.WriteField(value);
225-
}
226-
await csvWriter.NextRecordAsync();
227-
}
228-
}
229-
230-
await csvWriter.FlushAsync();
231-
return writer.ToString();
232101
}
233102
}

src/CsvPluginSpecifications.cs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
using FlowSynx.PluginCore;
2+
3+
namespace FlowSynx.Plugins.Csv;
4+
5+
public class CsvPluginSpecifications : PluginSpecifications
6+
{
7+
public override void Validate()
8+
{
9+
10+
}
11+
}

src/FlowSynx.Plugins.Csv.csproj

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
<Project Sdk="Microsoft.NET.Sdk">
22

33
<PropertyGroup>
4-
<TargetFramework>net9.0</TargetFramework>
4+
<TargetFramework>net10.0</TargetFramework>
55
<ImplicitUsings>enable</ImplicitUsings>
66
<Nullable>enable</Nullable>
77
</PropertyGroup>
@@ -13,7 +13,7 @@
1313

1414
<ItemGroup>
1515
<PackageReference Include="CsvHelper" Version="33.1.0" />
16-
<PackageReference Include="FlowSynx.PluginCore" Version="1.3.4" />
16+
<PackageReference Include="FlowSynx.PluginCore" Version="1.3.5" />
1717
</ItemGroup>
1818

1919
<ItemGroup>

0 commit comments

Comments
 (0)