HomeServer/meross.py
2023-09-19 10:52:12 +02:00

94 lines
2.4 KiB
Python

import asyncio
import os
import datetime
import time
import sqlite3
from meross_iot.controller.mixins.electricity import ElectricityMixin
from meross_iot.http_api import MerossHttpClient
from meross_iot.manager import MerossManager
EMAIL = os.environ.get('MEROSS_EMAIL') or "superschneider@t-online.de"
PASSWORD = os.environ.get('MEROSS_PASSWORD') or "hodasemi1"
async def main():
# Setup the HTTP client API from user-password
http_api_client = await MerossHttpClient.async_from_user_password(
api_base_url='https://iotx-eu.meross.com',
email=EMAIL,
password=PASSWORD
)
# Setup and start the device manager
manager = MerossManager(http_client=http_api_client)
await manager.async_init()
# Retrieve all the devices that implement the electricity mixin
await manager.async_device_discovery()
devs = manager.find_devices(device_class=ElectricityMixin)
if len(devs) < 1:
print("No electricity-capable device found...")
else:
dev = devs[0]
# Update device status: this is needed only the very first time we play with this device (or if the
# connection goes down)
await dev.async_update()
con = connect_to_db()
while True:
# Read the electricity power/voltage/current
instant_consumption = await dev.async_get_instant_metrics()
insert_into_db(con, instant_consumption.power)
time.sleep(3.0)
# Close the manager and logout from http_api
manager.close()
await http_api_client.async_logout()
def connect_to_db():
con = sqlite3.connect("data.db")
cur = con.cursor()
cur.execute("""
CREATE TABLE IF NOT EXISTS data (
id INTEGER PRIMARY KEY,
time INTEGER NOT NULL,
watts REAL NOT NULL
)""")
con.commit()
return con
def insert_into_db(con, watts):
now = datetime.datetime.now()
unix_time = time.mktime(now.timetuple()) * 1000
date = (int(unix_time), watts)
cur = con.cursor()
cur.execute("""
INSERT INTO data (time, watts)
VALUES (?, ?)
""",
date)
con.commit()
if __name__ == '__main__':
if os.name == 'nt':
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.stop()