Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 46 additions & 46 deletions pleth/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,12 @@ def json(self) -> typing.Dict:
'n': f'{self.n:064x}',
}

def pubkey(self):
def pubkey(self) -> PubKey:
pubkey = pleth.secp256k1.G * pleth.secp256k1.Fr(self.n)
return PubKey(pubkey.x.x, pubkey.y.x)

@classmethod
def random(cls) -> typing.Self:
def random(cls) -> PriKey:
return PriKey(max(1, secrets.randbelow(pleth.secp256k1.N)))

def sign(self, data: bytearray) -> bytearray:
Expand Down Expand Up @@ -92,7 +92,7 @@ def pt(self) -> pleth.secp256k1.Pt:
return pleth.secp256k1.Pt(pleth.secp256k1.Fq(self.x), pleth.secp256k1.Fq(self.y))

@classmethod
def pt_decode(cls, data: pleth.secp256k1.Pt) -> typing.Self:
def pt_decode(cls, data: pleth.secp256k1.Pt) -> PubKey:
return PubKey(data.x.x, data.y.x)


Expand Down Expand Up @@ -127,15 +127,15 @@ def __repr__(self) -> str:

def envelope(self) -> bytearray:
return pleth.rlp.encode([
pleth.rlp.put_uint(self.nonce),
pleth.rlp.put_uint(self.gas_price),
pleth.rlp.put_uint(self.gas),
self.nonce,
self.gas_price,
self.gas,
self.to if self.to else bytearray(),
pleth.rlp.put_uint(self.value),
self.value,
self.data,
pleth.rlp.put_uint(self.v),
pleth.rlp.put_uint(self.r),
pleth.rlp.put_uint(self.s),
self.v,
self.r,
self.s,
])

def hash(self) -> bytearray:
Expand All @@ -157,15 +157,15 @@ def json(self) -> typing.Dict:
def sign(self, prikey: PriKey) -> None:
# EIP-155: https://github.com/ethereum/EIPs/blob/master/EIPS/eip-155.md
sign = prikey.sign(hash(pleth.rlp.encode([
pleth.rlp.put_uint(self.nonce),
pleth.rlp.put_uint(self.gas_price),
pleth.rlp.put_uint(self.gas),
self.nonce,
self.gas_price,
self.gas,
self.to if self.to else bytearray(),
pleth.rlp.put_uint(self.value),
self.value,
self.data,
pleth.rlp.put_uint(pleth.config.current.chain_id),
pleth.rlp.put_uint(0),
pleth.rlp.put_uint(0),
pleth.config.current.chain_id,
0,
0,
])))
self.r = int.from_bytes(sign[0x00:0x20])
self.s = int.from_bytes(sign[0x20:0x40])
Expand Down Expand Up @@ -205,17 +205,17 @@ def __repr__(self) -> str:

def envelope(self) -> bytearray:
return bytearray([0x01]) + pleth.rlp.encode([
pleth.rlp.put_uint(self.chain_id),
pleth.rlp.put_uint(self.nonce),
pleth.rlp.put_uint(self.gas_price),
pleth.rlp.put_uint(self.gas),
self.chain_id,
self.nonce,
self.gas_price,
self.gas,
self.to if self.to else bytearray(),
pleth.rlp.put_uint(self.value),
self.value,
self.data,
self.access_list,
pleth.rlp.put_uint(self.v),
pleth.rlp.put_uint(self.r),
pleth.rlp.put_uint(self.s),
self.v,
self.r,
self.s,
])

def hash(self) -> bytearray:
Expand All @@ -238,12 +238,12 @@ def json(self) -> typing.Dict:

def sign(self, prikey: PriKey) -> None:
sign = prikey.sign(hash(bytearray([0x01]) + pleth.rlp.encode([
pleth.rlp.put_uint(self.chain_id),
pleth.rlp.put_uint(self.nonce),
pleth.rlp.put_uint(self.gas_price),
pleth.rlp.put_uint(self.gas),
self.chain_id,
self.nonce,
self.gas_price,
self.gas,
self.to if self.to else bytearray(),
pleth.rlp.put_uint(self.value),
self.value,
self.data,
self.access_list,
])))
Expand Down Expand Up @@ -290,18 +290,18 @@ def __repr__(self) -> str:

def envelope(self) -> bytearray:
return bytearray([0x02]) + pleth.rlp.encode([
pleth.rlp.put_uint(self.chain_id),
pleth.rlp.put_uint(self.nonce),
pleth.rlp.put_uint(self.gas_tip_cap),
pleth.rlp.put_uint(self.gas_fee_cap),
pleth.rlp.put_uint(self.gas),
self.chain_id,
self.nonce,
self.gas_tip_cap,
self.gas_fee_cap,
self.gas,
self.to if self.to else bytearray(),
pleth.rlp.put_uint(self.value),
self.value,
self.data,
self.access_list,
pleth.rlp.put_uint(self.v),
pleth.rlp.put_uint(self.r),
pleth.rlp.put_uint(self.s),
self.v,
self.r,
self.s,
])

def hash(self) -> bytearray:
Expand All @@ -325,13 +325,13 @@ def json(self) -> typing.Dict:

def sign(self, prikey: PriKey) -> None:
sign = prikey.sign(hash(bytearray([0x02]) + pleth.rlp.encode([
pleth.rlp.put_uint(self.chain_id),
pleth.rlp.put_uint(self.nonce),
pleth.rlp.put_uint(self.gas_tip_cap),
pleth.rlp.put_uint(self.gas_fee_cap),
pleth.rlp.put_uint(self.gas),
self.chain_id,
self.nonce,
self.gas_tip_cap,
self.gas_fee_cap,
self.gas,
self.to if self.to else bytearray(),
pleth.rlp.put_uint(self.value),
self.value,
self.data,
self.access_list,
])))
Expand All @@ -353,7 +353,7 @@ def hash(self) -> bytearray:
data = '\x19Ethereum Signed Message:\n'
data += str(len(self.data))
data += self.data
return pleth.core.hash(data.encode())
return pleth.core.hash(bytearray(data.encode()))

def pubkey(self, sig: bytearray) -> PubKey:
m = pleth.secp256k1.Fr(int.from_bytes(self.hash()))
Expand Down
1 change: 1 addition & 0 deletions pleth/ecdsa.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ def sign(prikey: pleth.secp256k1.Fr, m: pleth.secp256k1.Fr) -> typing.Tuple[plet
if R.x.x >= pleth.secp256k1.N:
v |= 2
return r, s, v
raise Exception('unreachable')


def verify(pubkey: pleth.secp256k1.Pt, m: pleth.secp256k1.Fr, r: pleth.secp256k1.Fr, s: pleth.secp256k1.Fr) -> bool:
Expand Down
175 changes: 53 additions & 122 deletions pleth/rlp.py
Original file line number Diff line number Diff line change
@@ -1,137 +1,68 @@
import typing


def encode_byte(data: bytearray) -> bytearray:
body = bytearray()
if len(data) == 0x01 and data[0] <= 0x7f:
body.extend(data)
return body
if len(data) <= 0x37:
body.append(0x80 + len(data))
body.extend(data)
return body
if len(data):
size = put_uint(len(data))
def encode(data: int | bytearray | typing.List) -> bytearray:
if isinstance(data, int):
return encode(bytearray(data.to_bytes(32)).lstrip(bytearray([0x00])))
if isinstance(data, bytearray):
body = bytearray()
if len(data) == 0x01 and data[0] <= 0x7f:
body.extend(data)
return body
if len(data) <= 0x37:
body.append(0x80 + len(data))
body.extend(data)
return body
size = bytearray(len(data).to_bytes(32)).lstrip(bytearray([0x00]))
body.append(0xb7 + len(size))
body.extend(size)
body.extend(data)
return body


def encode_list(data: typing.List[bytearray]) -> bytearray:
head = bytearray()
body = bytearray()
for e in data:
body.extend(encode(e))
if len(body) <= 0x37:
head.append(0xc0 + len(body))
return head + body
if len(body):
size = put_uint(len(body))
if isinstance(data, list):
head = bytearray()
body = bytearray()
for e in data:
body.extend(encode(e))
if len(body) <= 0x37:
head.append(0xc0 + len(body))
return head + body
size = bytearray(len(body).to_bytes(32)).lstrip(bytearray([0x00]))
head.append(0xf7 + len(size))
head.extend(size)
return head + body
raise Exception('unreachable')


def encode(data: bytearray | typing.List[bytearray]) -> bytearray:
if isinstance(data, bytearray):
return encode_byte(data)
if isinstance(data, list):
return encode_list(data)


def decode_byte(data: bytearray) -> bytearray:
head = data[0]
assert head <= 0xbf
if head <= 0x7f:
assert len(data) == 1
body = data.copy()
return body
if head <= 0xb7:
size = head - 0x80
assert len(data) == 1 + size
body = data[1:].copy()
return body
if head <= 0xbf:
nlen = head - 0xb7
def decode(data: bytearray) -> bytearray | typing.List[bytearray]:
if data[0] <= 0x7f:
return bytearray([data[0]])
if data[0] <= 0xb7:
size = data[0] - 0x80
return data[1:1+size]
if data[0] <= 0xbf:
nlen = data[0] - 0xb7
size = int.from_bytes(data[1:1+nlen])
assert len(data) == 1 + nlen + size
body = data[1+nlen:].copy()
body = data[1+nlen:1+nlen+size]
return body


def decode_list(data: bytearray) -> typing.List[bytearray]:
head = data[0]
assert head >= 0xc0
offs = 0
body = []
if offs == 0 and head <= 0xf7:
size = head - 0xc0
assert len(data) == 1 + size
offs += 1
if offs == 0 and head <= 0xff:
nlen = head - 0xf7
if data[0] <= 0xf7:
size = data[0] - 0xc0
body = data[1:1+size]
rets = []
offs = 0
while offs < len(body):
item = decode(body[offs:])
rets.append(item)
offs += len(encode(item))
return rets
if data[0] <= 0xff:
nlen = data[0] - 0xf7
size = int.from_bytes(data[1:1+nlen])
assert len(data) == 1 + nlen + size
offs += 1
offs += nlen
for _ in range(1 << 0xf):
if offs >= len(data):
break
head = data[offs]
if head < 0x80:
body.append(decode_byte(data[offs: offs+1]))
offs += 1
continue
if head < 0xb8:
size = head - 0x80
body.append(decode_byte(data[offs: offs+1+size]))
offs += 1
offs += size
continue
if head < 0xc0:
nlen = head - 0xb7
size = int.from_bytes(data[offs+1:offs+1+nlen])
body.append(decode_byte(data[offs: offs+1+nlen+size]))
offs += 1
offs += nlen
offs += size
continue
if head < 0xf8:
size = head - 0xc0
body.append(decode_list(data[offs: offs+1+size]))
offs += 1
offs += size
continue
if head:
nlen = head - 0xf7
size = int.from_bytes(data[offs+1:offs+1+nlen])
body.append(decode_list(data[offs: offs+1+nlen+size]))
offs += 1
offs += nlen
offs += size
continue
return body


def decode(data: bytearray) -> bytearray | typing.List[bytearray]:
if data[0] <= 0xbf:
return decode_byte(data)
if data[0]:
return decode_list(data)


def get_bool(data: bytearray) -> bool:
return len(data) == 1


def get_uint(data: bytearray) -> int:
return int.from_bytes(data)


def put_bool(data: bool) -> bytearray:
return bytearray([0x01]) if data else bytearray()


def put_uint(data: int) -> bytearray:
return bytearray(data.to_bytes(32)).lstrip(bytearray([0x00]))
body = data[1+nlen:1+nlen+size]
rets = []
offs = 0
while offs < len(body):
item = decode(body[offs:])
rets.append(item)
offs += len(encode(item))
return rets
raise Exception('unreachable')
7 changes: 4 additions & 3 deletions pleth/rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@


def call(method: str, params: typing.List) -> typing.Any:
call.rate = getattr(call, 'rate', pleth.rate.Limits(pleth.config.current.rpc.qps, 1))
call.rate.wait(1)
if not hasattr(call, 'rate'):
setattr(call, 'rate', pleth.rate.Limits(pleth.config.current.rpc.qps, 1))
getattr(call, 'rate').wait(1)
r = requests.post(pleth.config.current.rpc.url, json={
'id': random.randint(0x00000000, 0xffffffff),
'jsonrpc': '2.0',
Expand Down Expand Up @@ -154,7 +155,7 @@ def eth_protocol_version() -> str:
return call('eth_protocolVersion', [])


def eth_send_raw_transaction(tx: typing.Dict) -> str:
def eth_send_raw_transaction(tx: str) -> str:
return call('eth_sendRawTransaction', [tx])


Expand Down
Loading