-
Notifications
You must be signed in to change notification settings - Fork 0
OpenDSS ingestion #21
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: staging
Are you sure you want to change the base?
Conversation
|
the calculation within the streaming each row is taking about ~2sec to solve the simulation with mapping of uuids and converting into pyarrow table to have a 30hz streaming data. currently only getting about 30 points per minute. |
cbrochtrup
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is super cool! I can think of a lot of great uses for this tool already. Thanks for sharing!
| _streamset = StreamSet([Stream(conn, uuid=k) for k in | ||
| data_map.keys()]) | ||
| _streamset.arrow_insert(data_map) | ||
| if i % INSERT_BATCH_SIZE == 0: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happens if end this loop when this condition is not met? will the data not flush? If so, we should add another condition here.
| if i % INSERT_BATCH_SIZE == 0: | |
| if i % INSERT_BATCH_SIZE == 0 or i == new_load.shape[1]-1: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
right now for this, i was just testing for running for a few mins at a time, and wanted points inserted with a specific flush condition. i'm not sure if the INSERT_BATCH_SIZE is actually even a good number for flushing.
at the end if not flushed will automatically flushed after 8 hours according to the pqm.
| class TestOpendssIngestor: | ||
| def test_initialize_simulation(self): | ||
| # Arrange | ||
| mock_model_loc = os.path.join( | ||
| MODEL_REL_PATH, | ||
| "Models/13Bus/IEEE13Nodeckt.dss" | ||
| ) | ||
| load, load_names = ([1155., 160., 120., 120., 170., 230., 170., | ||
| 485., 68., 290., 170., 128., 17., 66., 117.], | ||
| ['671', '634a', '634b', '634c', '645', '646', '692', | ||
| '675a', '675b', '675c', '611', '652', '670a', '670b', | ||
| '670c']) | ||
| # Act | ||
| results = initialize_simulation(mock_model_loc) | ||
| assert results[0].tolist() == load | ||
| assert results[-1] == load_names |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice tests. This setup is great and it'd be useful in a fixture to improve our testing suite. IIUC, we have to run test_initialize_simulation before we run test_simulate_network otherwise the dss model won't be loaded.
To that end, we should make the initialization a fixture both tests can explicitly depend on the setup.
| class TestOpendssIngestor: | |
| def test_initialize_simulation(self): | |
| # Arrange | |
| mock_model_loc = os.path.join( | |
| MODEL_REL_PATH, | |
| "Models/13Bus/IEEE13Nodeckt.dss" | |
| ) | |
| load, load_names = ([1155., 160., 120., 120., 170., 230., 170., | |
| 485., 68., 290., 170., 128., 17., 66., 117.], | |
| ['671', '634a', '634b', '634c', '645', '646', '692', | |
| '675a', '675b', '675c', '611', '652', '670a', '670b', | |
| '670c']) | |
| # Act | |
| results = initialize_simulation(mock_model_loc) | |
| assert results[0].tolist() == load | |
| assert results[-1] == load_names | |
| @pytest.fixture(scope="session") | |
| def _initialize_simulation(): | |
| class TestOpendssIngestor: | |
| @pytest.fixutre(scope="session") | |
| def simulation_setup(self, initialize_simulation): | |
| # Arrange | |
| mock_model_loc = os.path.join( | |
| MODEL_REL_PATH, | |
| "Models/13Bus/IEEE13Nodeckt.dss" | |
| ) | |
| # Act | |
| return initialize_simulation(mock_model_loc) | |
| def test_initialize_simulation(self, simulation_setup): | |
| load, load_names = ([1155., 160., 120., 120., 170., 230., 170., | |
| 485., 68., 290., 170., 128., 17., 66., 117.], | |
| ['671', '634a', '634b', '634c', '645', '646', '692', | |
| '675a', '675b', '675c', '611', '652', '670a', '670b', | |
| '670c']) | |
| assert simulation_setup[0].tolist() == load | |
| assert simulation_setup[-1] == load_names |
It may also make sense have run_simulation use an already initialized test. That way we can test these two functions more easily. So maybe the main function looks more like
def main(stuff):
results = initialize_simulation(path)
run_simulation(other_args_plus, results)There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
still needs more test added definitely. and the initialize_simulation should be called. I wanted this test to make sure that we always have the Model files. all 3 files are needed, or else we get an error in the subsequent calls to the Opendss modeling code.
| mock_model_loc = os.path.join( | ||
| MODEL_REL_PATH, | ||
| "Models/13Bus/IEEE13Nodeckt.dss" | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we also create this model path in simulation_utils. Can we create this path in one spot and use that reference both here and in simulation_utils.py
|
|
||
| def get_loads() -> Tuple[np.ndarray, List[str]]: | ||
| """Get all the loads in the network""" | ||
| load_names = dss.Loads.AllNames() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How does this function know where to load the files from?
| while True: | ||
| for row in data.iterrows(): | ||
| streamset.insert(row.to_dict()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
iterrows is terrifyingly slow sometimes, 10x slower than itertuples. Can we use that here instead. The updated code would look something like this:
| while True: | |
| for row in data.iterrows(): | |
| streamset.insert(row.to_dict()) | |
| while True: | |
| for row in data.itertuples(index=False): | |
| streamset.insert(row._asdict()) |
| collection = bus_name | ||
| if ismag: | ||
| lastltr = "M" | ||
| else: | ||
| lastltr = "A" | ||
|
|
||
| name = "V" + phase + lastltr | ||
| return collection, name |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can remove bus_name from this function. We return collection which is equal to bus_name.
Creating opendss_simulations into ingress tool to make available to any python programmers.