Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ dependencies = [
"grpcio~=1.71",
"grpcio-tools~=1.71",
"grpcio-health-checking~=1.71",
"web3~=7.0",
"wheel~=0.45",
"rlp~=4.0",
"ipfshttpclient==0.4.13.2",
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ protobuf~=6.30
grpcio~=1.71
grpcio-tools~=1.71
grpcio-health-checking~=1.71
web3~=7.0
wheel~=0.45
rlp~=4.0
ipfshttpclient==0.4.13.2
Expand Down
75 changes: 40 additions & 35 deletions snet/sdk/mpe/payment_channel_provider.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
from pathlib import Path

from web3._utils.events import get_event_data
from eth_abi.codec import ABICodec
import pickle

from web3.types import LogReceipt

from snet.sdk.mpe.payment_channel import PaymentChannel
from snet.contracts import get_contract_deployment_block


BLOCKS_PER_BATCH = 5000
BLOCKS_PER_BATCH = 50000
CHANNELS_DIR = Path.home().joinpath(".snet", "cache", "mpe")


Expand All @@ -21,12 +21,11 @@
text="ChannelOpen(uint256,uint256,address,address,address,bytes32,uint256,uint256)")]
self.deployment_block = get_contract_deployment_block(self.web3, "MultiPartyEscrow")
self.mpe_address = mpe_contract.contract.address
print(self.mpe_address)
self.channels_file = CHANNELS_DIR.joinpath(str(self.mpe_address), "channels.pickle")

def update_cache(self):
channels = []
last_read_block = self.deployment_block - 1
last_read_block = int(self.deployment_block) - 1

if not self.channels_file.exists():
print(f"Channels cache is empty. Caching may take some time when first accessing channels.\nCaching in progress...")
Expand All @@ -43,21 +42,24 @@
last_read_block = load_dict["last_read_block"]
channels = load_dict["channels"]

current_block_number = self.web3.eth.block_number
current_block_number = int(self.web3.eth.block_number)

if last_read_block < current_block_number:
new_channels = self._get_all_channels_from_blockchain_logs_to_dicts(last_read_block + 1, current_block_number)
channels = channels + new_channels
last_read_block = current_block_number
new_channels = self._get_all_channels_from_blockchain_logs_to_dicts(last_read_block + 1, int(current_block_number))

with open(self.channels_file, "wb") as f:
dict_to_save = {
"last_read_block": last_read_block,
"channels": channels
}
pickle.dump(dict_to_save, f)
if len(new_channels) > 0:
channels = channels + new_channels
last_read_block = current_block_number

def _event_data_args_to_dict(self, event_data):
with open(self.channels_file, "wb") as f:
dict_to_save = {
"last_read_block": last_read_block,
"channels": channels
}
pickle.dump(dict_to_save, f)

@classmethod
def _event_data_args_to_dict(cls, event_data: dict) -> dict:
return {
"channel_id": event_data["channelId"],
"sender": event_data["sender"],
Expand All @@ -66,25 +68,28 @@
"group_id": event_data["groupId"],
}

def _get_all_channels_from_blockchain_logs_to_dicts(self, starting_block_number, to_block_number):
codec: ABICodec = self.web3.codec

logs = []
from_block = starting_block_number
while from_block <= to_block_number:
to_block = min(from_block + BLOCKS_PER_BATCH, to_block_number)
logs = logs + self.web3.eth.get_logs({"fromBlock": from_block,
"toBlock": to_block,
"address": self.mpe_address,
"topics": self.event_topics})
from_block = to_block + 1

event_abi = self.mpe_contract.contract.events.ChannelOpen._get_event_abi()

event_data_list = [get_event_data(codec, event_abi, l)["args"] for l in logs]
channels_opened = list(map(self._event_data_args_to_dict, event_data_list))

return channels_opened
def _get_all_channels_from_blockchain_logs_to_dicts(self, start_block: int, end_block: int) -> list[dict]:
logs: list[LogReceipt] = []
from_block = start_block
try:
it = 1

Check notice on line 75 in snet/sdk/mpe/payment_channel_provider.py

View check run for this annotation

snet-sonarqube-app / SonarQube Code Analysis

snet/sdk/mpe/payment_channel_provider.py#L75

Remove the unused local variable "it".
while from_block <= end_block:
to_block = min(from_block + BLOCKS_PER_BATCH, end_block)
logs = logs + self.web3.eth.get_logs({"fromBlock": from_block,
"toBlock": to_block,
"address": self.mpe_address,
"topics": self.event_topics})
from_block = to_block + 1
it += 1
except Exception as e:
print(f"WARNING: during channels caching the error occurred: {e}")

finally:
channel_open_event = self.mpe_contract.contract.events.ChannelOpen()
event_data_list = [channel_open_event.process_log(l)["args"] for l in logs]
channels_opened = list(map(self._event_data_args_to_dict, event_data_list))

return channels_opened

def _get_channels_from_cache(self):
self.update_cache()
Expand Down
Loading