Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -130,3 +130,6 @@ dmypy.json

.vscode/
.idea/
pyvenv.cfg
lib64
bin
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[metadata]
name = sonnenbatterie
version = 0.6.1
version = 0.7.1
author = Jan Weltmeyer
description = "Access Sonnenbatterie REST API"
long_description = file: README.md
Expand Down
215 changes: 181 additions & 34 deletions sonnenbatterie/sonnenbatterie.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import json
import sys
import base64, binascii, hashlib, hmac, string

from sonnenbatterie2.sonnenbatterie2 import AsyncSonnenBatterieV2

Expand Down Expand Up @@ -33,16 +34,92 @@ def __init__(self,username,password,ipaddress):


def login(self):
password_sha512 = hashlib.sha512(self.password.encode('utf-8')).hexdigest()
req_challenge=requests.get(self.baseurl+'challenge', timeout=self._batteryLoginTimeout)
req_challenge.raise_for_status()
challenge=req_challenge.json()
response=hashlib.pbkdf2_hmac('sha512',password_sha512.encode('utf-8'),challenge.encode('utf-8'),7500,64).hex()

getsession=requests.post(self.baseurl+'session',{"user":self.username,"challenge":challenge,"response":response}, timeout=self._batteryLoginTimeout)
getsession.raise_for_status()
token=getsession.json()['authentication_token']
self.token=token

salt = requests.get(self.baseurl+'salt/'+self.username, timeout=self._batteryLoginTimeout, allow_redirects=False)##returns a solt after battery got updated to a certatin version (>1.18??)
if salt.status_code==200:
self.create_session_token_new()
else:
password_sha512 = hashlib.sha512(self.password.encode('utf-8')).hexdigest()
req_challenge=requests.get(self.baseurl+'challenge', timeout=self._batteryLoginTimeout)
req_challenge.raise_for_status()
challenge=req_challenge.json()
response=hashlib.pbkdf2_hmac('sha512',password_sha512.encode('utf-8'),challenge.encode('utf-8'),7500,64).hex()

getsession=requests.post(self.baseurl+'session',{"user":self.username,"challenge":challenge,"response":response}, timeout=self._batteryLoginTimeout)
getsession.raise_for_status()
token=getsession.json()['authentication_token']
self.token=token





def create_response_from_values(self, username, password, challenge, salt):
pw_sha512_hex = hashlib.sha512(password.encode("utf-8")).hexdigest()
pw_bytes = pw_sha512_hex.encode("utf-8")
dk = hashlib.pbkdf2_hmac("sha512", pw_bytes, salt.encode("utf-8"), 7500, dklen=64)
derived_hex = dk.hex()
key = derived_hex.encode("utf-8")
response = hmac.new(key, challenge.encode("utf-8"), hashlib.sha256).hexdigest()
return response

def create_session_token_new(self):
session = requests.Session()

# Step 1: challenge
r = session.get(f"{self.baseurl}challenge", timeout=self._timeout)
r.raise_for_status()
try:
challenge = r.json()
if isinstance(challenge, dict):
challenge = challenge.get("challenge") or next(iter(challenge.values()))
except Exception:
challenge = r.text.strip()

# Step 2: salt
r = session.get(f"{self.baseurl}salt/{self.username}", timeout=self._timeout)
r.raise_for_status()
salt = r.json()["salt"]

# Step 4: loop: POST /session, maybe retry with new_challenge
for _ in range(2):
response = self.create_response_from_values(self.username, self.password, challenge, salt)
payload = {
"user": self.username,
"challenge": challenge,
"response": response,
}

r = session.post(
url=f"{self.baseurl}session",
data=payload,
timeout=self._timeout,
)
txt = r.text
if r.status_code >= 400:
raise RuntimeError(
f"Login failed with HTTP {r.status_code}\n"
f"URL: {r.url}\n"
f"Payload: {json.dumps(payload)}\n"
f"challenge: {challenge}\n"
f"salt: {salt}\n"
f"response: {response}\n"
f"Server reply: {txt}"
)
r.raise_for_status()
data = r.json()

if "new_challenge" in data and data["new_challenge"]:
challenge = data["new_challenge"]
continue

if "authentication_token" in data and data["authentication_token"]:
self.token = data["authentication_token"]
return

raise RuntimeError(f"Unexpected /session response: {json.dumps(data)}")

raise RuntimeError("Failed to obtain authentication_token")

def set_login_timeout(self, timeout:int = 120):
self._batteryLoginTimeout = timeout
Expand Down Expand Up @@ -242,36 +319,106 @@ async def login(self):
if self._session is None:
self._session = aiohttp.ClientSession()

pw_sha512 = hashlib.sha512(self.password.encode('utf-8')).hexdigest()
req_challenge = await self._session.get(
self.baseurl+'challenge',
timeout=self._timeout,
)
req_challenge.raise_for_status()

challenge = await req_challenge.json()
response = hashlib.pbkdf2_hmac(
'sha512',
pw_sha512.encode('utf-8'),
challenge.encode('utf-8'),
7500,
64
).hex()

session = await self._session.post(
url = self.baseurl+'session',
data = {"user":self.username,"challenge":challenge,"response":response},
timeout=self._timeout,
)
session.raise_for_status()
token = await session.json()
self.token = token['authentication_token']
salt = await self._session.get(self.baseurl+'salt/'+self.username, timeout=self._timeout, allow_redirects=False)##returns a solt after battery got updated to a certatin version (>1.18??)
if salt.status==200:
await self.create_session_token_new()
else:
pw_sha512 = hashlib.sha512(self.password.encode('utf-8')).hexdigest()
req_challenge = await self._session.get(
self.baseurl+'challenge',
timeout=self._timeout,
)
req_challenge.raise_for_status()

challenge = await req_challenge.json()
response = hashlib.pbkdf2_hmac(
'sha512',
pw_sha512.encode('utf-8'),
challenge.encode('utf-8'),
7500,
64
).hex()

session = await self._session.post(
url = self.baseurl+'session',
data = {"user":self.username,"challenge":challenge,"response":response},
timeout=self._timeout,
)
session.raise_for_status()
token = await session.json()
self.token = token['authentication_token']

# Inisitalite async API v2
if self.sb2 is None:
self.sb2 = AsyncSonnenBatterieV2(ip_address=self.ipaddress, api_token=self.token)



def create_response_from_values(self, username, password, challenge, salt):
pw_sha512_hex = hashlib.sha512(password.encode("utf-8")).hexdigest()
pw_bytes = pw_sha512_hex.encode("utf-8")
dk = hashlib.pbkdf2_hmac("sha512", pw_bytes, salt.encode("utf-8"), 7500, dklen=64)
derived_hex = dk.hex()
key = derived_hex.encode("utf-8")
response = hmac.new(key, challenge.encode("utf-8"), hashlib.sha256).hexdigest()
return response

async def create_session_token_new(self):
# Step 1: challenge
async with self._session.get(f"{self.baseurl}challenge") as r:
r.raise_for_status()
try:
challenge = await r.json()
if isinstance(challenge, dict):
challenge = challenge.get("challenge") or next(iter(challenge.values()))
except Exception:
challenge = (await r.text()).strip()

# Step 2: salt
async with self._session.get(f"{self.baseurl}salt/{self.username}") as r:
r.raise_for_status()
salt = (await r.json())["salt"]


# Step 4: loop: POST /session, maybe retry with new_challenge
for _ in range(2):
response = self.create_response_from_values(self.username,self.password,challenge,salt)
async with self._session.post(

url = self.baseurl+'session',
data = {"user":self.username,"challenge":challenge,"response":response},
timeout=self._timeout,

) as r:
txt = await r.text()
if r.status >= 400:
raise RuntimeError(
f"Login failed with HTTP {r.status}\n"
f"URL: {r.url}\n"
f"Payload: {json.dumps(payload)}\n"
f"challenge: {challenge}\n"
f"salt: {salt}\n"
f"response: {response}\n"
f"Server reply: {txt}"
)
r.raise_for_status()
data = await r.json()

if "new_challenge" in data and data["new_challenge"]:
challenge = data["new_challenge"]
continue

if "authentication_token" in data and data["authentication_token"]:
self.token = data["authentication_token"]
return

raise RuntimeError(f"Unexpected /session response: {json.dumps(data)}")

raise RuntimeError("Failed to obtain authentication_token")




""" Base functions """


Expand Down
49 changes: 49 additions & 0 deletions test/test_login_new.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@

#!/usr/bin/env python3
# I suspect that I haven't got to grips with the way phthon does things, but soppusely this will setup the path to allod for the sonnen batteri moduel to be in a separate location
# To me having to do this for testing seems a horrendous hack
import asyncio
import os
import sys
import time
import json
import sys
import base64, binascii, hashlib, hmac, string
script_path = os.path.realpath(os.path.dirname(__name__))
os.chdir(script_path)
sys.path.append("..")
from pprint import pprint
import hashlib

# this is based on the test code by rust dust

def main():
challenge="39bdd8a304b84c76"
challenge="d43aeb06a6394daa"
username="User"
password="sonnenUser3552!"
salt="04f8996da763b7a969b1028ee3007569eaf3a635486ddab211d512c85b9df8fb_02bbe1e15021947b232050cc88b07772"
testResponse=create_response_from_values(username,password,challenge,salt)
wantedResponse="39b5cb8cc21772b482cdbbcbd135f4a05af4974f3e1b9e4c8aa846d6e64bcc1a"


if testResponse==wantedResponse:
print("It works!")
else:
print("doesnt work :(")

print("Response: "+testResponse)


def create_response_from_values(username, password, challenge, salt):
pw_sha512_hex = hashlib.sha512(password.encode("utf-8")).hexdigest()
pw_bytes = pw_sha512_hex.encode("utf-8")
dk = hashlib.pbkdf2_hmac("sha512", pw_bytes, salt.encode("utf-8"), 7500, dklen=64)
derived_hex = dk.hex()
key = derived_hex.encode("utf-8")
response = hmac.new(key, challenge.encode("utf-8"), hashlib.sha256).hexdigest()
return response


if __name__ == '__main__':
main()