Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
201 changes: 30 additions & 171 deletions custom-recipes/pi-system-af-tree/recipe.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,15 @@
# -*- coding: utf-8 -*-
import dataiku
from dataiku.customrecipe import get_input_names_for_role, get_recipe_config, get_output_names_for_role
import pandas as pd
from safe_logger import SafeLogger
from osisoft_plugin_common import (
get_credentials, get_interpolated_parameters, normalize_af_path,
get_combined_description, get_base_for_data_type, check_debug_mode,
PerformanceTimer, get_max_count, check_must_convert_object_to_string,
convert_schema_objects_to_string, get_summary_parameters, get_advanced_parameters,
get_batch_parameters
get_credentials, PerformanceTimer
)
from osisoft_client import OSIsoftClient
from osisoft_constants import OSIsoftConstants


logger = SafeLogger("pi-system plugin", forbiden_keys=["token", "password"])

logger.info("PIWebAPI Assets values downloader recipe v{}".format(
logger.info("PIWebAPI AF selector recipe v{}".format(
OSIsoftConstants.PLUGIN_VERSION
))

Expand All @@ -30,182 +23,48 @@ def get_step_value(item):
return None


def next_tree_item(tree_data):
if not isinstance(tree_data, list):
return
for item in tree_data:
children = item.pop("children", [])
if children:
for child in next_tree_item(children):
yield child
yield item


input_dataset = get_input_names_for_role('input_dataset')
output_names_stats = get_output_names_for_role('api_output')
config = get_recipe_config()
print("ALX:config={}".format(config))
dku_flow_variables = dataiku.get_flow_variables()
tree_data = config.get("treeData", [])

logger.info("Initialization with config config={}".format(logger.filter_secrets(config)))

auth_type, username, password, server_url, is_ssl_check_disabled = get_credentials(config)
is_debug_mode = check_debug_mode(config)
max_count = get_max_count(config)
summary_type = config.get("summary_type")
must_convert_object_to_string = check_must_convert_object_to_string(config)

use_server_url_column = config.get("use_server_url_column", False)
if not server_url and not use_server_url_column:
raise ValueError("Server domain not set")

path_column = config.get("path_column", "")
if not path_column:
raise ValueError("There is no parameter column selected.")

data_type = config.get("data_type")
start_time = config.get("start_time")
end_time = config.get("end_time")
use_start_time_column = config.get("use_start_time_column", False)
start_time_column = config.get("start_time_column")
use_end_time_column = config.get("use_end_time_column", False)
end_time_column = config.get("end_time_column")
server_url_column = config.get("server_url_column")
use_batch_mode, batch_size = get_advanced_parameters(config)
interval, sync_time, boundary_type = get_interpolated_parameters(config)
record_boundary_type = config.get("record_boundary_type") if data_type == "RecordedData" else None
summary_type, summary_duration = get_summary_parameters(config)
do_duplicate_input_row = config.get("do_duplicate_input_row", False)
max_request_size, estimated_density, maximum_points_returned = get_batch_parameters(config)
max_time_to_retrieve_per_batch = estimated_density / maximum_points_returned #density per hour <- max time is in hour

network_timer = PerformanceTimer()
processing_timer = PerformanceTimer()
processing_timer.start()

input_parameters_dataset = dataiku.Dataset(input_dataset[0])
output_dataset = dataiku.Dataset(output_names_stats[0])
input_parameters_dataframe = input_parameters_dataset.get_dataframe()

results = []
time_last_request = None
client = None
previous_server_url = ""
time_not_parsed = True

input_columns = list(input_parameters_dataframe.columns) if do_duplicate_input_row else []

schema = [
{'name': 'title', 'type': 'string'},
{'name': 'template_name', 'type': 'string'},
{'name': 'category_names', 'type': 'array'},
{'name': 'path', 'type': 'string'},
{'name': 'id', 'type': 'string'},
{'name': 'url', 'type': 'string'},
{'name': 'checked', 'type': 'boolean'},
{'name': 'expanded', 'type': 'boolean'},
]
output_dataset.write_schema(schema)

selectedAttributes = config.get("selectedAttributes", [])
with output_dataset.get_writer() as writer:
first_dataframe = True
absolute_index = 0
batch_buffer_size = 0
buffer = []
for index, input_parameters_row in input_parameters_dataframe.iterrows():
absolute_index += 1
server_url = input_parameters_row.get(server_url_column, server_url) if use_server_url_column else server_url
start_time = input_parameters_row.get(start_time_column, start_time) if use_start_time_column else start_time
end_time = input_parameters_row.get(end_time_column, end_time) if use_end_time_column else end_time
row_name = input_parameters_row.get("Name")
duplicate_initial_row = {}
nb_rows_to_process = input_parameters_dataframe.shape[0]
for input_column in input_columns:
duplicate_initial_row[input_column] = input_parameters_row.get(input_column)

if client is None or previous_server_url != server_url:
client = OSIsoftClient(
server_url, auth_type, username, password,
is_ssl_check_disabled=is_ssl_check_disabled,
is_debug_mode=is_debug_mode, network_timer=network_timer
)
previous_server_url = server_url
if time_not_parsed:
# make sure all OSIsoft time string format are evaluated at the same time
# rather than at every request, at least for start / end times set in the UI
time_not_parsed = False
start_time = client.parse_pi_time(start_time)
end_time = client.parse_pi_time(end_time)
sync_time = client.parse_pi_time(sync_time)

object_id = input_parameters_row.get(path_column)
item = None
if client.is_resource_path(object_id):
object_id = normalize_af_path(object_id)
item = client.get_item_from_path(object_id)
step_value = get_step_value(item)
if item:
rows = client.recursive_get_rows_from_item(
item,
data_type,
start_date=start_time,
end_date=end_time,
interval=interval,
sync_time=sync_time,
boundary_type=boundary_type,
record_boundary_type=record_boundary_type,
max_count=max_count,
can_raise=False,
object_id=object_id,
summary_type=summary_type,
summary_duration=summary_duration
)
elif use_batch_mode:
buffer.append({"WebId": object_id})
batch_buffer_size += 1
if (batch_buffer_size >= batch_size) or (absolute_index == nb_rows_to_process):
rows = client.get_rows_from_webids(
buffer, data_type, max_count=max_count,
start_date=start_time,
end_date=end_time,
interval=interval,
sync_time=sync_time,
boundary_type=boundary_type,
record_boundary_type=record_boundary_type,
can_raise=False,
batch_size=batch_size,
object_id=object_id,
summary_type=summary_type,
summary_duration=summary_duration,
endpoint_type="AF",
estimated_density=estimated_density,
maximum_points_returned=maximum_points_returned
)
batch_buffer_size = 0
buffer = []
else:
continue
else:
rows = client.recursive_get_rows_from_webid(
object_id,
data_type,
start_date=start_time,
end_date=end_time,
interval=interval,
sync_time=sync_time,
boundary_type=boundary_type,
record_boundary_type=record_boundary_type,
max_count=max_count,
can_raise=False,
endpoint_type="AF",
summary_type=summary_type,
summary_duration=summary_duration
)
for row in rows:
row["Name"] = row_name
row[path_column] = object_id
if isinstance(row, list):
for line in row:
base = get_base_for_data_type(data_type, object_id, Step=step_value)
base.update(line)
extention = client.unnest_row(base)
results.extend(extention)
else:
base = get_base_for_data_type(data_type, object_id, Step=step_value)
if duplicate_initial_row:
base.update(duplicate_initial_row)
base.update(row)
extention = client.unnest_row(base)
results.extend(extention)

unnested_items_rows = pd.DataFrame(results)
if first_dataframe:
default_columns = OSIsoftConstants.RECIPE_SCHEMA_PER_DATA_TYPE.get(data_type)
if must_convert_object_to_string:
default_columns = convert_schema_objects_to_string(default_columns)
combined_columns_description = get_combined_description(default_columns, unnested_items_rows)
output_dataset.write_schema(combined_columns_description)
first_dataframe = False
if not unnested_items_rows.empty:
writer.write_dataframe(unnested_items_rows)
results = []
for item in selectedAttributes :
if item.get("checked", True) is True:
writer.write_row_dict(item)

processing_timer.stop()
logger.info("Overall timer:{}".format(processing_timer.get_report()))
Expand Down
Loading