Shelly, AWS та GCP: як я аналізував споживання електроенергії вдома
Привіт, спільното! Мене звати Олег Можаровський, я працюю Automation QA Engineer з використанням Python. Окрім того, розвиваю експертизу в Data Engineering, аналітиці та контролі якості.
У цьому матеріалі я розповім про свій досвід аналізу споживання електроенергії в окремому домогосподарстві. Я розповім про сам процес вимірювань, розрахунки та висновки, які можна зробити на основі отриманих даних. Також розглянемо IoT Data Analytics зі Small Data (дуже хотілось Big Data, але потік і обсяг даних просто не дотягують). Мій досвід може стати в пригоді тим, хто хоче краще розуміти своє електроспоживання та проводити власні вимірювання. Це такий собі DIY-проєкт для домашньої IoT-аналітики.
Проблема дефіциту електроенергії
Через російські обстріли енергетичних об’єктів постачання електроенергії стало нестабільним. Це вже вплинуло на ціни, а в найближчому майбутньому тарифи можуть зрости ще більше.
Я вважаю, що нинішня система регулювання ціни на електроенергію (ПСО) та розділення тарифів для населення і бізнесу шкодить країні. Такий підхід може мати негативні наслідки в довгостроковій перспективі. Зрозуміло, що електроенергія дешевшати не буде. Тому в цій реальності можна піти двома з половиною шляхами: заробляти більше, споживати ощадливо або мати резервне джерело живлення на час аварійних або планових відключень.
Але перш ніж купувати, наприклад, потужний інвертор Deye на 12 кВт в однокімнатну квартиру з маленькими вікнами на північ, важливо чітко розуміти власне споживання. Кваліфіковані інсталятори завжди про це говорять, а ось ті, хто просто хоче продати техніку, радять брати «із запасом».
Переглянути платіжку за місяць недостатньо. Важливо знати пікові реальні потужності споживання та їхню тривалість, так званий «профайл споживання впродовж дня» та споживання за певні інтервали часу.
Огляд пристроїв для вимірювання
Отож нам потрібний «розумний» лічильник електроенергії. З бажаних характеристик: власний cloud-додаток, Wi-Fi-підключення (щоб не витрачати зайві кошти на окремий ZigBee чи інший пристрій до лічильника), підтримка MQTT (щоб можна було передавати дані на сторонні сервіси), а ще компактність, безпечність і простота інсталяції.
На ринку є китайські та європейські моделі. Перші — це всілякі Atorch, Stonoff, Zemismart та інші клони, що зазвичай працюють через портал Tuya. Їхня основна перевага — нижча вартість. Серед європейських брендів — австрійський Fronius, болгарський Shelly та нідерландський Victron. В Америки своя особливість електроживлення, тому деякі рішення з США не завжди можуть бути застосовані в реаліях України.






Гортайте вбік, щоб подивитися всі зображення
Я вибрав Shelly EM — це компактний пристрій болгарського виробництва, що відповідає нормам ЄС. Серед його переваг — низьке енергоспоживання, комунікація з зовнішнім світом через Wi-Fi, зручне налаштування за допомогою Wi-Fi та Bluetooth. А ще він має додаток і певний cloud-based сервіс. Вміє надсилати телеметрію на сторонні сервіси через MQTT. Може «з коробки» інтегруватися з Ecoflow Powerstream, що допомагає уникнути зворотного перетоку згенерованої електроенергії (про це є матеріали як німецькою, так і українською). Якщо ж говорити про недоліки приладу, то це його ціна.
Підключення та інтеграція Shelly EM
Зараз виробник випустив третє покоління цього пристрою, тож для його встановлення не потрібні складні технічні навички. Проте базове розуміння електротехніки все ж знадобиться.
- Живлення — потрібно підключити дроти до клем L та N.
- Однофазна або трифазна мережа — якщо у вас трифазне живлення, краще купити спеціальну модель замість трьох однофазних лічильників.
- Встановлення — для зручності можна зробити відведення від вхідного автомата, щоб безпечно розмістити пристрій у щитку.
- Датчик струму — його потрібно закріпити на проводі, що йде від автомата. Важливо не переплутати напрямок струму, хоча навіть якщо помилитеся, у додатку є функція реверсу.
Налаштування пристрою відбувається через мобільний додаток, який доступний для Android та iPhone. Увімкніть Bluetooth та додайте пристрій. Якщо з першого разу не вдалося, зробіть скидання відповідно до інструкції та повторіть. Вкажіть домашню Wi-Fi-мережу й пароль. У процесі налаштування пристрій потрібно додати двічі, оскільки в нього є два входи для вимірювань, хоча поки що використовується лише один. Вкажіть тип (номінал) датчика струму: стандартний варіант 50А, але можна обрати 80А, 120А або 400А для трифазної версії. Впоралися? Вітаю, пристрій почав рахувати ваше споживання.


Гортайте вбік, щоб подивитися всі зображення
Експорт даних
IoT
Спочатку я думав, що всі дані зберігатимуться в додатку і їх можна буде легко завантажити для аналізу. Але ніт.




Гортайте вбік, щоб подивитися всі зображення
Виробник надає хмарний сервіс, у якому дані про споживання зберігаються протягом року. У безкоштовній версії інформація агрегується погодинно, а в платній можна переглядати споживання навіть похвилинно. Проте у хмарі доступні лише загальні показники, а не дані про потужність у реальному часі. Завантажити інформацію можна у форматі CSV, але тільки для певного проміжку часу та в агрегованому вигляді. Це не зовсім той формат взаємодії, якого я очікував. Клікати й зберігати похвилинні дані за місяць (чи навіть тиждень) я не мав бажання.
Тут варто згадати, що пристрій підтримує MQTT. Виробник пропонує покрокову інструкцію для ручного налаштування передачі телеметрії в AWS IoT Core. Також є скрипт для автоматизованого налаштування через Linux Shell. Якщо ви працюєте у Windows, можна скористатися WSL (Windows Subsystem for Linux), щоб запустити цей скрипт — для цього треба бути в тій самій мережі, що і пристрій.
Перед налаштуванням я попередньо встановив Python 3.12 у WSL, хоча виробник заявляє, що скрипт зробить це автоматично. Також необхідно мати активний AWS-акаунт з доступом до IoT-сервісів. Перелік достатніх політик виробник зазначає.
Якщо Ви бажаєте експортувати дані в інше місце, дотримуйтесь рекомендацій щодо ручного налаштування. В іншому випадку запускаємо скрипт і вводимо необхідні дані.


Запам’ятовуємо назву, під якою зберегли пристрій, — вона стане root-топіком. Тепер налаштування завершено — можна перевіряти.
Агрегація та збереження даних
Отож MQTT-брокер отримує телеметрію щодо споживання — налаштуймо її зберігання.
Створіть сховище для даних AWS S3 bucket:
aws s3 mb s3://your-bucket-name --region your-region, де
- your-bucket-name — унікальна назва вашого сховища;
- your-region — можна використати us-east-1 (США) або eu-central-1 (Європа).
AWS S3 bucket доступні для взаємодії глобально, але варто дотримуватися одного регіону, оскільки подальші налаштування залежать від нього.
Якщо робота з AWS Cloud Shell здається складною, те саме можна зробити в графічній консолі AWS Console (Web UI):
- перейдіть до сервісу Amazon S3;
- натисніть кнопку Create bucket;
- введіть ім’я бакета та виберіть регіон;
- натисніть Create для створення бакета.
Додайте необхідні права доступу. Тепер підготовлюємо s3-policy.json:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": "s3:PutObject", "Resource": "arn:aws:s3:::your-bucket-name/*" } ] }
Створюємо необхідну довірчу політику:
aws iam create-role --role-name IoT-S3-Role --assume-role-policy-document '{"Version": "2012-10-17","Statement": [{"Effect": "Allow","Principal": {"Service": "iot.amazonaws.com"},"Action": "sts:AssumeRole"}]}'
...та політику дозволу на запис в AWS S3:
aws iam put-role-policy --role-name IoT-S3-Role --policy-name S3AccessPolicy --policy-document file://s3-policy.json
Або через AWS Console (Web UI):
- перейдіть до сервісу IAM в AWS Console;
- створіть нову роль IoT-S3-Role;
- додайте до неї політику, яка дозволяє дії s3:PutObject на AWS S3 бакеті.
На створенні правила AWS IoT Core для експорту даних в S3 бакет зупинимось детальніше:
- перейдіть до AWS IoT Core Console;
- виберіть Message Routing → Rules. Натисніть Create, щоб створити нове правило;
- вкажіть назву правила й опис;
- у секції Rule query statement введіть SQL-запит для вибору даних з MQTT-топіків. Наприклад:
SELECT * FROM 'MozharovskysEM/status/em1:0'
- у секції Set one or more actions натисніть Add action та виберіть Send a message to an S3 bucket та виберіть S3 бакет;
- вкажіть ключ (шлях у бакеті), під яким будуть зберігатися файли:
(MozharovskysEM/${parse_time('yyyy', timestamp(), 'UTC')}/${parse_time('MM', timestamp(), 'UTC')}/${parse_time('dd', timestamp(), 'UTC')}/${timestamp()}.json)
- виберіть IAM роль, яку ви створили (IoT_S3_Role);
- натисніть Add action;
- натисніть Create rule.
Створення таке правило через AWS CloudShell станом на зараз неможливо, але можна скористатись Python та AWS SDK:
import boto3 client = boto3.client('iot') sql = "SELECT * FROM 'MozharovskysEM/status/em1:0'" action = { 's3': { 'bucketName': 'your-bucket-name', 'key': 'MozharovskysEM/${parse_time('yyyy', timestamp(), 'UTC')}/${parse_time('MM', timestamp(), 'UTC')}/${parse_time('dd', timestamp(), 'UTC')}/${timestamp()}.json', 'roleArn': 'arn:aws:iam::your-account-id:role/IoT_S3_Role' } } response = client.create_topic_rule( ruleName='YourRuleName', topicRulePayload={ 'sql': sql, 'description': 'Rule to send MQTT messages to S3', 'actions': [action], 'ruleDisabled': False } ) print(response)
Датчик шле кілька топіків з різним форматом даних — можливо, вам буде цікавіше працювати з іншими.
У запиті
SELECT * FROM 'MozharovskysEM/status/em1:0'
важливими є елементи:
- ’MozharovskysEM — назва топіка і пристрою, яку ви визначаєте при додаванні пристрою в AWS IoT Core під час виконання скрипта від виробника;
- em1:0 — визначає перший ввід датчика (якщо в цьому топіку немає даних, використовуйте em1:1).
Також розберемо ключ збереження
'MozharovskysEM/${parse_time('yyyy', timestamp(), 'UTC')}/${parse_time('MM', timestamp(), 'UTC')}/${parse_time('dd', timestamp(), 'UTC')}/${timestamp()}.json'
Телеметрія буде зберігатись в об’єкти (файли) формату ’MozharovskysEM/YYYY/MM/DD/CURRENT_TIMESTAMP.json’
{"current": 0.471, "voltage": 228.1, "act_power": 60.3, "aprt_power": 107.9, "pf": 0.7, "freq": 50.0}
У підсумку за один день надходить
Однак під час розробки виявилось, що за раз AWS Lambda не завжди встигає обробити всі дрібні об’єкти навіть за максимального таймауту. Через це було змінено підхід: замість однієї AWS Lambda було створено AWS State Machine, яка запускає Lambda, перевіряє її виконання і за потреби повторює процес. Своєю чергою AWS Lambda за один раз опрацьовує тільки 50 дрібних об’єктів, додає їхні дані до агрегованого файлу, видаляє оброблені файли й повертає статус AWS State Machine.
Запуск цієї AWS State Machine відбувається за розкладом (наприклад, раз на день о дев’ятій годині ранку) через Amazon Event Bridge.
Код AWS Lambda функції для агрегації:
import logging import json import boto3 from datetime import datetime, timedelta, timezone # Configure logger logger = logging.getLogger() logger.setLevel(logging.INFO) s3_client = boto3.client('s3') def lambda_handler(event, context): has_more_files = False logger.info("Start of function") # Calculate date to process (yesterday) process_date = datetime.now(timezone.utc) - timedelta(days=1) date_prefix = process_date.strftime('%Y/%m/%d') bucket_name = 'yuor_s3_bucket_name' main_directories = ['MozharovskysEM'] transfer_files = list() # Process each directory for main_dir in main_directories: prefix = f'{main_dir}/{date_prefix}/' logger.info(f"Processing directory: {prefix}") paginator = s3_client.get_paginator('list_objects_v2') page_iterator = paginator.paginate(Bucket=bucket_name, Prefix=prefix, PaginationConfig={'MaxItems': 50, 'PageSize': 50}) # Iterate through pages of results for page in page_iterator: combined_data = [] files_to_delete = [] if 'Contents' in page: for item in page['Contents']: file_key = item['Key'] # Skip aggregated file based on its name pattern if file_key.endswith(f'{process_date.strftime("%Y-%m-%d")}.json'): logger.info(f"Skipping aggregated file: {file_key}") continue # Read and process file file_response = s3_client.get_object(Bucket=bucket_name, Key=file_key) file_content = file_response['Body'].read().decode('utf-8') try: json_content = json.loads(file_content.strip()) json_content.pop('id', None) json_content.pop('calibration', None) json_content['timestamp'] = file_key.split('/')[-1].split('.')[0] combined_data.append(json_content) except json.JSONDecodeError as source_decode_error: logger.error(f"Error decoding JSON from file {file_key}: {source_decode_error}") files_to_delete.append({'Key': file_key}) combined_filename = f'{process_date.strftime("%Y-%m-%d")}.json' combined_filepath = f'{prefix}{combined_filename}' # Check if aggregated file already exists try: existing_data = s3_client.get_object(Bucket=bucket_name, Key=combined_filepath) existing_content = existing_data['Body'].read().decode('utf-8') for line in existing_content.strip().split('\n'): try: existing_json = json.loads(line) combined_data.append(existing_json) except json.JSONDecodeError as decoding_error: logger.error( f"Error decoding JSON from line in existing file {combined_filepath}: {decoding_error}" ) continue logger.info(f"Existing aggregated data found and merged: {combined_filepath}") except s3_client.exceptions.NoSuchKey: logger.info("No existing file, creating new one") # Write aggregated data body_content = '\n'.join(json.dumps(item) for item in combined_data) s3_client.put_object(Bucket=bucket_name, Key=combined_filepath, Body=body_content) if files_to_delete: delete_response = s3_client.delete_objects( Bucket=bucket_name, Delete={'Objects': files_to_delete} ) logger.info(f"Deleted processed files: {len(files_to_delete)} files") logger.info(f"Batch for {main_dir} processed and files deleted") # Re-check each directory for remaining files for main_dir in main_directories: prefix = f'{main_dir}/{date_prefix}/' logger.info(f"Re-checking directory for remaining files: {prefix}") paginator = s3_client.get_paginator('list_objects_v2') page_iterator = paginator.paginate(Bucket=bucket_name, Prefix=prefix, PaginationConfig={'MaxItems': 50, 'PageSize': 50}) # Iterate through pages of results for page in page_iterator: if 'Contents' not in page: continue for item in page['Contents']: file_key = item['Key'] # Skip aggregated file if not file_key.endswith(f'{process_date.strftime("%Y-%m-%d")}.json'): # If another file is found, set flag to True and exit early has_more_files = True break if has_more_files: break # Return result logger.info(f"Are there more files to process? {has_more_files}") if not has_more_files: for main_dir in main_directories: prefix = f'{main_dir}/{date_prefix}/' combined_filename = f'{process_date.strftime("%Y-%m-%d")}.json' combined_filepath = f'{prefix}{combined_filename}' transfer_files.append(combined_filepath) logger.info("End of function") message = "All data processed for all directories" if not has_more_files else "There are more data to process" return { 'hasMoreFiles': has_more_files, 'filenames': transfer_files, 's3bucket': bucket_name, 'body': json.dumps(message) }
Конфігурація AWS State Machine:
{ "Comment": "A state machine that processes files in an S3 bucket using Lambda functions.", "StartAt": "InvokeІS3PreprocessingLambda", "States": { "InvokeІS3PreprocessingLambda": { "Type": "Task", "Resource": "arn:aws:lambda:_your_region_:_your_account_:function:_agregate_function_name_:1", "Next": "IsThereMoreFilesInS3ToProcess", "ResultPath": "$.lambdaOutput", "Comment": "Invokes Lambda function to preprocess data from S3. Stores output in $.lambdaOutput." }, "IsThereMoreFilesInS3ToProcess": { "Type": "Choice", "Choices": [ { "Variable": "$.lambdaOutput.hasMoreFiles", "BooleanEquals": true, "Next": "InvokeІS3PreprocessingLambda", "Comment": "If there are more files to process, invoke preprocessing Lambda again." }, { "Variable": "$.lambdaOutput.hasMoreFiles", "BooleanEquals": false, "Next": "InvokeTransferToGCPLambda", "Comment": "If there are no more files to process, proceed to transfer files to GCP." } ], "Default": "EndProcessing", "Comment": "Decides next step based on whether there are more files to process." }, "InvokeTransferToGCPLambda": { "Type": "Task", "Resource": "arn:aws:lambda:_your_region_:_your_account_:function:_transfer_to_gcp_function_name_:1", "Parameters": { "s3bucket.$": "$.lambdaOutput.s3bucket", "filenames.$": "$.lambdaOutput.filenames" }, "End": true, "Comment": "Invokes Lambda function to transfer processed files from S3 to GCP. Uses parameters passed from previous Lambda output." }, "EndProcessing": { "Type": "Succeed", "Comment": "Marks end of state machine execution." } } }
Дані з пристрою можна обробляти не лише від одного датчика, але на момент публікації була перевірена робота лише з одним. Після успішної агрегації всі зібрані за день дані формуються в один JSONL-файл, який передається в GCP Bucket.
Виникає питання: для чого передавати дані саме в GCP? Відповідь проста — я краще знайомий із засобами обробки даних у GCP, ніж в AWS. Але навіщо було починати з AWS? Тому що GCP певний час тому відмовився від аналога AWS IoT Core і для реалізації аналогічного функціоналу в GCP треба розгортати окрему інфраструктуру. Тому я вирішив скористатись наявною інфраструктурою AWS IoT Core для отримання даних і надалі для аналізу послуговуватися засобами GCP.
Отже, нам треба передати дані в GCP. Для цього робимо ще одну AWS Lambda функцію (_transfer_to_gcp_function_name_):
import json import logging import boto3 from google.cloud import storage from google.oauth2 import service_account # Configure logger logger = logging.getLogger() logger.setLevel(logging.INFO) def get_gcp_access_secret(): # Define secret name and AWS region secret_name = "GCPTransferJobAccessTokenSecret" region_name = "_your_gcp_bucket_region_" # Create a Secrets Manager client session = boto3.session.Session() client = session.client(service_name='secretsmanager', region_name=region_name) try: # Attempt to retrieve secret value get_secret_value_response = client.get_secret_value(SecretId=secret_name) except Exception as exception_error: # Log an error if secret cannot be retrieved logger.error(f"Error retrieving secret: {exception_error}") return None else: # Check if secret contains a 'SecretString' and log retrieval if 'SecretString' in get_secret_value_response: secret = get_secret_value_response['SecretString'] logger.info(f"Retrieved GCP Access secret: {secret}") return json.loads(secret) else: # Log an error if 'SecretString' is missing logger.error("Secret does not contain 'SecretString'") return None def lambda_handler(event, context): logger.info("Start of function") # Retrieve GCP credentials from secret manager gcp_credentials = service_account.Credentials.from_service_account_info(get_gcp_access_secret()) logger.info(f'Retrieved credentials: {gcp_credentials}') # Extract S3 bucket name and filenames from event aws_s3bucket_name = event.get('s3bucket') filenames = event.get('filenames', []) # Define GCP bucket name gcp_bucket_name = _your_gcp_bucket_name # Create an S3 client s3_client = boto3.client('s3') # Create a GCP Storage client gcp_storage_client = storage.Client(credentials=gcp_credentials) logger.info("GCP Storage Client created successfully") # Access GCP bucket bucket = gcp_storage_client.bucket(gcp_bucket_name) for filename in filenames: # Read file from S3 try: file_obj = s3_client.get_object(Bucket=aws_s3bucket_name, Key=filename) file_content = file_obj['Body'].read() except Exception as s3bucket_exception: logger.error(f"Failed to read file from S3: {s3bucket_exception}") continue # Skip this file and continue with next # Upload file to GCP blob = bucket.blob(filename) try: blob.upload_from_string(file_content) except Exception as gcpbucket_exception: logger.error(f"Failed to upload file to GCP: {gcpbucket_exception}") continue # Skip this file and continue with next logger.info(f"File {filename} successfully transferred to GCP") logger.info("End of function") return { 'statusCode': 200, 'body': json.dumps("Files processing completed") }
Не забуваємо про requirements.txt:
google-cloud-storage google-auth
Оскільки цих бібліотек немає в стандартному наборі для AWS Lambda, їхній код треба буде додати вручну до нашої AWS Lambda функції. Щоб додати Layer до AWS Lambda функції на Python з бібліотеками google-cloud-storage та google-auth, створіть папку python усередині нової папки python-layer та встановіть бібліотеки в Python за допомогою команди
pip install google-cloud-storage google-auth -t .
Створіть ZIP-архів python-layer.zip з папки python, завантажте його в AWS Lambda як новий Layer і додайте цей Layer до вашої Lambda функції через AWS Management Console.
Ще один важливий момент — використання ключа до доступу в GCP Bucket. Його ми зберігаємо вручну в AWS Secrets Manager, а функція get_gcp_access_secret() отримує його звідти під час виконання запиту.
Звичайно ж, треба створити свій проєкт в GCP та GCP Bucket з певним іменем.
Для того щоб отримати ключ від GCP, який ми будемо використовувати в AWS Lambda для передачі об’єкта-файлу з AWS S3 в GCP Bucket, треба створити GCP Service Account, дати йому права на писання в GCP Bucket (наприклад, Storage Admin, хоча це більше, ніж потрібно). Після цього в налаштуваннях акаунту, у вкладці Keys, потрібно створити ключ у форматі JSON. Цей згенерований файл містить усі необхідні дані, які потрібно зберегти в AWS Secrets Manager у raw-форматі.
Тепер наші дані щодо електроспоживання домогосподарства з Shelly EM весело біжать по MQTT до AWS IoT Core, зберігаються та агрегуються в AWS S3, а відтак копіюються до GCP Bucket, де готові до аналізу.
Щоб оптимізувати зберігання, для даних в AWS S3 налаштоване автоматичне перенесення в Glacier Deep Archive через кілька днів після створення. В архів потрапляють уже агреговані добові файли, оскільки дрібні об’єкти видаляються AWS Lambda під час обробки. Функцію автоматичного видалення поки не налаштовував, щоб мати резервну копію на випадок можливих помилок у GCP під час розробки.
Формат агрегованих даних відповідає стандарту JSONL (рядки йдуть без коми вкінці):
{"current": 0.471, "voltage": 228.1, "act_power": 60.3, "aprt_power": 107.9, "pf": 0.7, "freq": 50.0, "timestamp": "1737416994203"} {"current": 0.47, "voltage": 230.8, "act_power": 60.6, "aprt_power": 108.7, "pf": 0.7, "freq": 50.0, "timestamp": "1737417006175"} {"current": 0.467, "voltage": 230.4, "act_power": 61.2, "aprt_power": 108.0, "pf": 0.7, "freq": 50.0, "timestamp": "1737417021189"} {"current": 0.465, "voltage": 230.4, "act_power": 60.7, "aprt_power": 107.5, "pf": 0.7, "freq": 50.0, "timestamp": "1737417037174"} {"current": 0.47, "voltage": 228.8, "act_power": 61.2, "aprt_power": 108.1, "pf": 0.7, "freq": 50.0, "timestamp": "1737417052176"} {"current": 0.466, "voltage": 228.7, "act_power": 60.7, "aprt_power": 107.0, "pf": 0.7, "freq": 50.0, "timestamp": "1737417068163"}
Аналітика даних
Аналіз потужності споживання
Не будемо намагатись робити overengeneering і для початкової аналітики спочатку скористаємося BigQuery.
Перш за все створіть DataSet:
- відкрийте Google Cloud Console і перейдіть до розділу BigQuery — у лівій бічній панелі виберіть або створіть проєкт, у якому буде зберігатися набір даних;
- натисніть кнопку Create Dataset на верхній панелі інструментів — у формі, що з’явиться, введіть ім’я датасету та виберіть регіон зберігання даних (інші параметри можна лишити за замовчуванням).
Або виконайте GCP Cloud Shell команду:
bq mk --dataset --location=[LOCATION] [PROJECT_ID]:[DATASET], де
- [LOCATION] — регіон, де буде розміщено датасет;
- [PROJECT_ID] — ID вашого проєкту в Google Cloud;
- [DATASET] — ім’я нового датасету.
Перейдіть у редактор запитів у BigQuery Studio та створіть зовнішню таблицю на основі вже наявних даних у GCP Bucket, виконавши SQL-запит:
CREATE EXTERNAL TABLE `your_project_id.your_dataset_name._your_external_table_name_` ( current FLOAT64, voltage FLOAT64, act_power FLOAT64, aprt_power FLOAT64, pf FLOAT64, freq FLOAT64, timestamp STRING ) OPTIONS ( format = 'NEWLINE_DELIMITED_JSON', uris = ['gs://your_bucket_name/_USER_DIRECTORY_NAME/*.json'] )
І робимо простий запит, щоб перевірити, що зовнішня таблиця успішно під’єдналась:
SELECT COUNT(*) FROM `_your_project_name._dataset_name_._your_external_table_name_`;
Вже маємо 145 781 запис.
Спробуймо щось складніше:
SELECT FLOOR(aprt_power / 10) * 10 as used_power, COUNT(*) AS detected_count FROM `_your_project_name._dataset_name_._your_external_table_name_` GROUP BY used_power ORDER BY detected_count DESC;
У вкладці Chart переглянемо візуалізацію отриманих даних. Краще обрати тип графіка Scatter, по осі X — used_power, по осі Y — detected_count.
Отриманий графік дає так званий «профіль потужності» домогосподарства. На ньому можна виділити зони низького споживання
На основі цього профілю можна підібрати оптимальний інвертор для резервного живлення. Наприклад, інвертор
Аналіз обсягу споживання
Швидку відповідь про середньодобове споживання можна отримати через мобільний додаток або вебпортал виробника лічильника. Там доступні графіки споживання за день або тиждень. У цьому домогосподарстві середнє споживання 7 кВт-год на добу — іноді менше, іноді більше. Але для глибшого аналізу потрібно розібратися детальніше.
Отже, всі наші виміри графічно можна подати приблизно як на зображенні.
Через певні неоднакові проміжки часу ми маємо значення вимірювання потужності. Вимірювання відбуваються не завжди рівно в 00 секунд, тому значення записуються з певною нерівномірністю, але доволі часто. У межах хвилинного інтервалу утворюється безліч маленьких трапецій, і для розрахунку споживання за хвилину потрібно підсумувати площі цих трапецій. Ну, і врахувати, що значення для 00 секунд треба дообчислити.
Хвилиночку, скажете ви: це ж, трясця, площа складної фігури і використання інтегралів! Ви ж їх вивчали в школі й усе думали, де ж вони знадобляться. Вітаємо, ачівка «скористався інтегралом» розблокована. Але і задачка буде трохи складнішою за шкільну. Щоб обчислити площу під кривою в BigQuery, спочатку потрібно перетворити значення часу у timestamp, після чого згрупувати дані за хвилинними інтервалами.
CREATE OR REPLACE TABLE `_your_project_name._dataset_name_._minute_grouped` AS SELECT TIMESTAMP_MILLIS(CAST(`timestamp` AS INT64)) AS timestamp, TIMESTAMP_TRUNC(TIMESTAMP_MILLIS(CAST(`timestamp` AS INT64)), MINUTE) AS minute, aprt_power FROM `_your_project_name._dataset_name_._your_external_table_name_` ORDER BY timestamp;
Обчислюємо площі під кривою для кожної хвилини. Спочатку створюємо таблицю, яка містить поточні та наступні значення потужності та часу для кожної хвилини.
CREATE OR REPLACE TABLE `_your_project_name._dataset_name_._lead_values` AS SELECT minute, aprt_power, LEAD(aprt_power) OVER (PARTITION BY minute ORDER BY timestamp) AS next_aprt_power, timestamp, LEAD(timestamp) OVER (PARTITION BY minute ORDER BY timestamp) AS next_timestamp FROM `_your_project_name._dataset_name_._minute_grouped`;
А далі — використовуємо створену таблицю _lead_values, щоб обчислити загальне споживання енергії для кожної хвилини.
CREATE OR REPLACE TABLE `_your_project_name._dataset_name_._energy_per_minute` AS SELECT minute, SUM(0.5 * (next_aprt_power + aprt_power) * TIMESTAMP_DIFF(next_timestamp, timestamp, SECOND) / 3600) AS energy_Wh FROM `_your_project_name._dataset_name_._lead_values` WHERE next_timestamp IS NOT NULL GROUP BY minute;
Важливо: оскільки в наш час електроенергія з мережі не завжди доступна, треба виключити з розрахунку хвилини, коли дані не надходили, наприклад, через знеструмлення приладу.
CREATE OR REPLACE TABLE `_your_project_name._dataset_name_._all_minutes` AS WITH time_range AS ( SELECT MIN(minute) AS min_time, MAX(minute) AS max_time FROM `_your_project_name._dataset_name_._minute_grouped` ) SELECT minute FROM time_range, UNNEST(GENERATE_TIMESTAMP_ARRAY(min_time, max_time, INTERVAL 1 MINUTE)) AS minute; CREATE OR REPLACE TABLE `_your_project_name._dataset_name_._energy_per_minute_with_zeros` AS SELECT all_minutes.minute, IFNULL(energy_minutes.energy_Wh, 0) AS energy_Wh FROM `_your_project_name._dataset_name_._all_minutes` all_minutes LEFT JOIN `_your_project_name._dataset_name_._energy_per_minute` energy_minutes ON all_minutes.minute = energy_minutes.minute ORDER BY all_minutes.minute;
Ну і тепер нарешті рахуємо розподілення спожитої електроенергії по часу доби за весь час вимірювання:
CREATE OR REPLACE TABLE `_your_project_name._dataset_name_._hourly_energy_consumption_utc` AS SELECT EXTRACT(HOUR FROM minute) AS hour_of_day, SUM(energy_Wh) AS total_energy_Wh FROM `_your_project_name._dataset_name_._energy_per_minute_with_zeros` GROUP BY hour_of_day ORDER BY hour_of_day;
Тут варто згадати, що UTC — це не київський час, тому для зручності варто враховувати відповідний часовий зсув:
CREATE OR REPLACE TABLE `_your_project_name._dataset_name_._hourly_energy_consumption_kyiv` AS SELECT EXTRACT(HOUR FROM minute AT TIME ZONE "Europe/Kiev") AS hour_of_day_kyiv, SUM(energy_Wh) AS total_energy_Wh FROM `_your_project_name._dataset_name_._energy_per_minute_with_zeros` GROUP BY hour_of_day_kyiv ORDER BY hour_of_day_kyiv;
А щоб уже зовсім красиво — з обиранням діапазону для аналізу:
DECLARE date_range STRING DEFAULT 'all_time'; -- 'all_time', 'last_30_days', 'current_month', 'specific_month' CREATE OR REPLACE TABLE `_your_project_name._dataset_name_._hourly_energy_consumption_kyiv` AS WITH filtered_data AS ( SELECT minute, energy_Wh FROM `_your_project_name._dataset_name_._energy_per_minute_with_zeros` WHERE (date_range = 'all_time' AND TRUE) OR (date_range = 'last_30_days' AND minute >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 30 DAY)) OR (date_range = 'current_month' AND EXTRACT(YEAR FROM minute) = EXTRACT(YEAR FROM CURRENT_DATE()) AND EXTRACT(MONTH FROM minute) = EXTRACT(MONTH FROM CURRENT_DATE())) OR (date_range = 'specific_month' AND EXTRACT(YEAR FROM minute) = 2021 AND EXTRACT(MONTH FROM minute) = 5) -- May 2021 ) SELECT EXTRACT(HOUR FROM minute AT TIME ZONE "Europe/Kiev") AS hour_of_day_kyiv, SUM(energy_Wh) AS total_energy_Wh FROM filtered_data GROUP BY hour_of_day_kyiv ORDER BY hour_of_day_kyiv;
Ну і нарешті переходимо до грошей:
CREATE OR REPLACE TABLE `_your_project_name._dataset_name_._tree_zones_tariff_hourly_kyiv_total` AS SELECT hour_of_day_kyiv, total_energy_Wh, total_energy_Wh / 1000 as total_energy_kWh, total_energy_Wh * 4.32 / 1000 AS cost_1_zone, -- 1 Zone tariff CASE -- 2 Zones tariff WHEN hour_of_day_kyiv BETWEEN 23 AND 24 OR hour_of_day_kyiv BETWEEN 0 AND 7 THEN total_energy_Wh * 2.16 / 1000 ELSE total_energy_Wh * 4.32 / 1000 END AS cost_2_zone, CASE -- 3 Zones tariff WHEN hour_of_day_kyiv BETWEEN 23 AND 24 OR hour_of_day_kyiv BETWEEN 0 AND 7 THEN total_energy_Wh * 1.73 / 1000 WHEN hour_of_day_kyiv BETWEEN 8 AND 11 OR hour_of_day_kyiv BETWEEN 20 AND 22 THEN total_energy_Wh * 6.48 / 1000 ELSE total_energy_Wh * 4.32 / 1000 END AS cost_3_zone FROM `_your_project_name._dataset_name_._hourly_energy_consumption_kyiv`;
Виводимо вартість за обчислювальний період:
SELECT sum(cost_1_zone), sum (cost_2_zone), sum(cost_3_zone) FROM `_your_project_name._dataset_name_._tree_zones_tariff_hourly_kyiv_total`;
Або красиву візуалізацію:
SELECT hour_of_day_kyiv, cost_1_zone, cost_2_zone, cost_3_zone FROM `_your_project_name._dataset_name_._tree_zones_tariff_hourly_kyiv_total` order by hour_of_day_kyiv asc;
Як бачимо, не всі тарифи однаково корисні для пересічного домогосподарства. Водночас до питання зміни тарифу за поточних цін слід підходити з урахуванням, що електроенергія не буде дешевшати. Якщо тариф змінюється за бажанням споживача, витрати на заміну окупляться за поточних цін на електроенергію і поточним профілем споживання приблизно через півтора року. Двозонний тариф вигідний у будь-якому випадку, тоді як тризонний тариф може бути корисним не для всіх, оскільки передбачає знижені ціни вночі, але підвищені — в години пікового навантаження.
Висновки
Ми зробили майже покрокову інструкцію, як підключити «розумний» лічильник до домогосподарства, налаштувати його на передачу даних до хмарних сервісів та зберігати зібрану інформацію.
Накопичені дані дозволили швидко визначити максимальну потужність споживання, сформувати профіль навантаження домогосподарства і зробити обґрунтовані висновки щодо резервного живлення з урахуванням бюджету і поставлених цілей.
Отримані дані дозволили нам зробити розрахунок споживання домогосподарства по годинах доби залежно від обраного для аналізу інтервалу часу спостереження. Наклавши на погодинну розгортку поточні тарифи на електроенергію, ми змогли оцінити перспективність використання багатозонової тарифікації. Це не додасть електроенергії в мережу, але потенційно дозволить платити за спожите менше. Знов-таки, ваші розрахунки можуть відрізнятись від наведених у публікації.
Залишаються певні сумніви щодо правильності підходу до обчислення спожитої електроенергії по відмітках потужності споживання. Запрошую до обговорення та оптимізації.
На момент покупки розумний лічильник коштував близько 80 євро (доставка DHL), але зараз його можуть не відправляти в Україну. В такому разі можна скористатися сервісом Nova Poshta Shopping або аналогами.
Вартість використання хмарних сервісів AWS за січень — 1,99 долара, очікуваний кошторис до кінця місяця — 2,82 долара. GCP поки що не протарифікований, але оновні витрати будуть за зберігання даних у GCP Bucket та виконання SQL-запитів у Big Query. Очікується орієнтовно
Після накопичення більшого діапазону даних планую розглянути альтернативний підхід до використання інвертора та акумуляторів у домогосподарстві: заряджати АКБ вночі від мережі, а вдень використовувати накопичену енергію для живлення будинку. Якщо домогосподарство вже купило обладнання для резервного і в міру комфортного живлення, варто оцінити терміни окупності такої стратегії.
Ще планую отримати дані з Photovoltaic Geographical Information System щодо інсоляції та можливої генерації від сонячних панелей. Сумістити їх з даними про споживання. Власна маленька балконна СЕС вже давно не дає мені спокою. Можливо, це дозволить споживати менше електроенергії з мережі й термін окупності буде не дуже великим.
Імовірно, ми вийдемо за межі звичайних SQL-записів у Big Query і навіть перейдемо до pySpark або інших інструментів.
32 коментарі
Додати коментар Підписатись на коментаріВідписатись від коментарів