Пошук спільних подорожей з просторово-часовим індексуванням
Привіт, я — Андрій Забавський, Data & Analytics Solutions Architect на SoftServe, займаюся переважно проектуванням та побудовою Big Data & Analytics рішень, а ще дуже люблю моделювання даних.
Кілька місяців тому до мене звернулися з проханням допомогти у вирішенні однієї цікавої задачі: визначення «дружніх» відносин між людьми на основі лише координат їх переміщень.
Ця задача не має швидких і готових рішень, а натомість відкриває широкий простір для творчості. Саме такий творчий підхід, пов’язаний із моделюванням та пошуком оптимальних структур даних — моя улюблена справа :)
Усі головні напрацювання — у статті. Не вважаю себе великим експертом у геопросторовій аналітиці, тому, якщо я упустив щось важливе — не судіть строго.
Постановка проблеми. Спільні подорожі
У сфері аналізу геопросторових даних існує цікаве завдання, яке можна назвати «Спільні подорожі». Воно передбачає виявлення випадків, коли об’єкти рухаються спільно, або, більш формально, коли двоє чи більше об’єктів переміщуються в безпосередній близькості один від одного протягом певного часу. Ця проблема є типовою для аналізу даних про рух, які можуть надходити з різних джерел, як-от GPS-трекери, мобільні телефони чи телеметричні системи транспортних засобів.
Ключові аспекти:
- Обсяг і складність даних: набори геопросторових даних, особливо ті, що відстежують рух у часі, можуть бути великими та складними.
- Синхронізація у часі: сутності, які подорожують разом, повинні мати траєкторії, синхронізовані в часі.
- Просторова близькість: визначення «спільної подорожі» вимагає встановлення порогових значень для просторової близькості.
Інструменти на ринку
Наразі на ринку немає інструментів, які б вирішили цю проблему з коробки. Однак існує багато розвинутих баз даних, як реляційних, так і NoSQL, які пропонують підтримку типів геопросторових даних, індексів і вбудованих функцій для обчислення відстані, пошуку близькості та просторового об’єднання.
Геопросторове індексування
Існують різні форми геопросторової індексації, такі як R-дерева, квадро-дерева, grid-індекси, криві заповнення простору та геохеші.
Давайте коротко розглянемо два приклади просторових індексів, які широко використовуються сьогодні:
- Шестикутники Uber H3.
- Bing Tiles від Microsoft.
Обидва індекси рекурсивно ділять простір на частини, H3 — на шестикутники, а Bing Tiles — на квадрати.
H3 пропонує кращий баланс з погляду фізичного розміру та керування сусідами.
Bing Tiles краще реалізовує належність дітей до батьків — точка, яка належить дочірній комірці, завжди належить і до батьківської комірки, що не завжди правда для H3-індексу.
Amazon Athena, використовуючи Trino як SQL-рушій, надає вбудовану підтримку Bing Tiles.
Саме тому методологія, описана нижче, базується на Bing Tiles. Якщо ви віддаєте перевагу іншій технології з підтримкою H3, можете адаптувати цю методологію для використання H3.
Демо-дані: Geolife GPS Trajectory Dataset
Розгляньмо методологію, використовуючи демо-дані. Хорошим прикладом для цієї мети є набір даних траєкторії GPS, зібраний у рамках проекту Geolife компанією Microsoft Research Asia, у якому брали участь 182 учасники протягом 5 років.
Траєкторії GPS цього набору даних представлені послідовністю точок із мітками часу, кожна з яких містить інформацію про широту та довготу. 91% траєкторій реєструються в щільному представленні, напртклад, кожні 1~5 секунд або кожні 5~10 метрів на точку.
Структура вхідних даних включає набір геоточок, які також містять ідентифікатори об’єктів та позначки часу:
CREATE EXTERNAL TABLE geolife_input( object_id string, lat float, long float, ts timestamp ) STORED AS PARQUET LOCATION 's3://geotrajectories-blog/geolife_input/' tblproperties ("parquet.compression"="SNAPPY");
Дані всередині виглядають так:
У додатку наприкінці статті є інструкції, де знайти адаптований набір даних і як почати його використовувати.
Пропозиція рішення
Технології: Amazon Athena як система розподілених запитів
Один із варіантів імплементації — використання комбінації масштабованих і ефективних механізмів зберігання даних та запитів.
На AWS-хмарі такий набір може виглядати наступним чином:
- AWS S3 — економічне, розподілене сховище файлів, здатне зберігати великі обсяги даних.
- Формат файлу Parquet із поколонковим зберіганням даних — оптимальний вибір для аналітичних запитів.
- Amazon Athena — зріла система розподілених запитів на основі Trino, яка забезпечує надійну підтримку геопросторових функцій.
Мислення множинами (Thinking in Sets)
Запропонований підхід базується на парадигмі «Мислення множинами», запропонованій Джо Селко для роботи з даними з використанням SQL:
- Множина в центрі. Замість того, щоб обробляти елементи окремо, працюйте з цілими множинами (наборами елементів) одночасно.
- Уникнення циклів. у SQL операції слід виконувати з використанням логіки на основі множин.
- Використовуйте оператори SQL, призначені для роботи з множинами, як-от агрегатні та віконні функції.
Методологія. Крок 1 — Кластеризація
Застосування підходу «мислення множинами» для порівняння траєкторій вимагає від нас ефективного представлення їх як дискретних множин. Щоб досягти цього, наші густонаселені послідовності точок повинні бути розподілені та поміщені у кластери, що характеризуються двома основними вимірами:
- Інтервали часу.
- Геопросторові регіони.
Розглянемо, як саме варто виконати цю кластеризацію.
Кластеризація за часом
Візьмемо дещо вужчу задачу про переміщення — подорожі людей. Та врахуємо людські обмеження.
У переважній більшості випадків однохвилинні сегменти чудово підходять для групування наших геоточок у добре збалансовані діапазони.
За одну хвилину людина не може подолати величезну відстань. І навпаки, якщо людина перебуває в русі, наступної хвилини вона буде переміщена на відстань, яка є достатньою для представлення іншою геопросторовою коміркою.
Геопросторові комірки
Для оптимізації геопросторових операцій припустимо, що люди подорожують разом в одному транспортному засобі — автобусі чи літаку. Цей «транспортний засіб» ми можемо представити як квадрат, окреслений однією з геопросторових індексних комірок на основі сітки. У нашому випадку використаємо комірку Bing Tile, яку підтримує Athena.
Варто врахувати, що на певному рівні масштабування комірка Bing на екваторі більша, ніж на полюсах. Тим не менше, на практиці можна обрати рівень масштабування, який буде підходити для переважної більшості ситуацій.
Зупинімося на комірці Bing Tile із рівнем масштабування 22 як на основі для операцій порівняння. За такої роздільної здатності розмір комірки біля екватора становить приблизно 10 метрів, тоді як у більш північних регіонах, таких як Монреаль, Канада, — близько
SQL для кластеризації геоточок
У наведеному нижче фрагменті SQL-коду ви знайдете процес нормалізації. Окрім розподілу геоточок у кластери, ми також визначаємо відстань, яку об’єкти подолали за своїм маршрутом. Хоча це додає певний рівень складності коду, інформація про відстань буде дуже цінною на наступних етапах.
Окрім відстані, ми також обчислюємо часовий проміжок між двома наступними спостереженнями. Пізніше будемо використовувати цю інформацію для розрахунку швидкості руху та для визначення перерв між спостереженнями.
CREATE EXTERNAL TABLE geolife_geo_point_1min( object_id string, lat float, long float, ts_1min timestamp, bt_22 string, delta_distance_meters float, delta_seconds int ) STORED AS PARQUET LOCATION 's3://geotrajectories-blog/kaggle/geolife_geo_point_1min/' tblproperties ("parquet.compression"="SNAPPY"); insert into geolife_geo_point_1min with delta_distances as ( select object_id, lat, long, ts, date_trunc('minute', ts) as ts_1min, bing_tile_quadkey(bing_tile_at(lat, long, 22)) as bt_22, ST_Distance( to_spherical_geography(st_point(long, lat)), to_spherical_geography(st_point(lag(long) over (partition by object_id order by ts), lag(lat) over (partition by object_id order by ts)) ) ) as delta_distance_meters, date_diff('second', lag(ts) over (partition by object_id order by ts), ts) as delta_seconds from geolife_input ), buckets_numbering_n_deltas as ( select object_id, lat, long, ts_1min, bt_22, row_number() over (partition by object_id, ts_1min order by ts) as row_num, sum(delta_distance_meters) over (partition by object_id, ts_1min) as one_mi_delta_distance, sum(delta_seconds) over (partition by object_id, ts_1min) as one_mi_delta_seconds from delta_distances ) select object_id, lat, long, ts_1min, bt_22, one_mi_delta_distance, one_mi_delta_seconds from buckets_numbering_n_deltas where row_num = 1 order by object_id, ts_1min
Крок 2.1 — Виокремлення Сесій Руху
Наступним кроком буде розрахунок рухомих сесій. Спробуймо спочатку визначити критерії сесії, щоб мати чітке розуміння.
Критерії сесії руху
Загалом визначення рухомої сесії не є простим завданням. Нам потрібно встановити певні порогові значення, щоб визначити, що ми вважаємо рухом та що робити, наприклад, коли людина деякий час залишається в заторах під час руху містом.
Розглянемо наступні порогові значення на основі типової людської поведінки:
- Відстань між двома послідовними геоточками, що нормалізовані до хвилинних інтервалів, має бути більше 50 метрів. Це означає, що людина рухається зі швидкістю, що дорівнює або перевищує швидкість повільної ходьби.
- Якщо дані геоточок відсутні більше 5 хвилин, ми вважаємо сеанс перерваним і починаємо запис нового.
SQL для ідентифікації сесій
У наведеному нижче фрагменті коду SQL проводиться ідентифікація сесій та присвоєння унікальних ідентифікаторів.
Спершу визначаємо, коли починається нова сесія. Ми використовуємо раніше встановлені критерії: якщо середня швидкість падає нижче 50 метрів на хвилину, або є перерви в спостереженні даних понад 5 хвилин, починаємо нову сесію. Початок нової сесії позначається 1, а всі інші випадки позначаються 0.
Далі використовуємо техніку, за якої обчислюємо кумулятивну суму для поля isNewSession, при цьому кожен новий сеанс збільшує лічильник на 1. Цей підхід дозволяє нам ефективно генерувати session_id.
Нарешті відфільтровуємо сеанси тривалістю менше 5 хвилин.
CREATE EXTERNAL TABLE geolife_moving_sessions( object_id string, lat float, long float, ts_1min timestamp, bt_22 string, session_id bigint, avg_speed_km_per_hour float ) STORED AS PARQUET LOCATION 's3://geotrajectories-blog/kaggle/geolife_moving_sessions/' tblproperties ("parquet.compression"="SNAPPY"); insert into geolife_moving_sessions with SessionStart as ( select object_id, lat, long, ts_1min, bt_22, delta_distance_meters, case when delta_distance_meters /(delta_seconds/60.) < 50 or delta_seconds > 5 * 60 or row_number() over (partition by object_id order by ts_1min) = 1 then 1 else 0 end as isNewSession from geolife_geo_point_1min ) ,Session as ( select object_id, lat, long, ts_1min, bt_22, case when isNewSession = 1 then 0 else delta_distance_meters end as delta_distance_meters, sum(isNewSession) over (partition by object_id order by ts_1min) as session_id from SessionStart ) ,SessionAttributes as ( select object_id, lat, long, ts_1min, bt_22, session_id, sum(delta_distance_meters) over (partition by object_id, session_id) / 1000 * 60 / (date_diff('minute', min(ts_1min) over (partition by object_id, session_id), max(ts_1min) over (partition by object_id, session_id))+1 ) as avg_speed_km_per_hour, count(*) over (partition by object_id, session_id) as session_size from Session ) select object_id, lat, long, ts_1min, bt_22, session_id, avg_speed_km_per_hour from SessionAttributes where session_size >= 5
Крок 2.2 — Розкладання сесій руху на сегменти
Як тільки ми визначили сесії та зберегли їх у таблиці geolife_moving_sessions, можемо виконувати різні види аналізу, включно з запитами — щоб визначити, які об’єкти подорожували разом. Однак, такий аналіз буде все ще не дуже ефективний. Тому підемо далі і спробуємо провести ряд оптимізацій.
Щоб ідентифікувати об’єкти, що рухаються разом, ми повинні порівняти їхні рухомі сесії.
На даному етапі кожна сесія складається із множити точок. Спробуємо кожну множину точок, що належить сесії, запакувати у коробку певної форми. Далі, щоб знайти однакові сесії, спершу будемо шукати коробки однакової форми. А потім вже порівнюватимемо вміст цих коробок.
З погляду фізичної моделі кожну сесію, що складається з множини рядків, перетворимо на один рядок, що має додаткові атрибути, які характеризують «форму коробки».
Пропоную знову використати дві ключові осі для дискретизації: час і місцезнаходження. Почнемо з визначення часу.
Розділимо сеанси на годинні інтервали. У реальному житті тривалість більшості переміщень вкладається в годину-дві. Тому сегмент, що охоплює годину, у більшості випадків охопить усю сесію або її значну частину.
Щодо географічної області, підхід може містити ідентифікацію комірки Bing Tile, яка охоплює весь сеанс. Однак обрати відповідний рівень масштабування складно, оскільки область, охоплена різними сеансами, може суттєво відрізнятися залежно від швидкості руху.
Тому встановимо кілька діапазонів:
- Для повільніших видів діяльності, як-от ходьба, виберемо Bing Tile з рівнем масштабування 12, що покриває приблизно
7–10 кілометрів. Так протягом годинного інтервалу сегмент сеансу розподілиться між одним або двома сегментами. - Для велосипедистів і бігунів, які рухаються зі швидкістю до 25 км/год, використаємо Bing Tile з рівнем масштабування 10, що охоплює площу приблизно 39 км.
- Для міського транспорту, що працює на швидкості до 60 км/год, підійде Bing Tile з рівнем масштабування 9, що охоплює територію до 78 км.
- Для високошвидкісних потягів і літаків застосуємо рівні масштабування 7 і 6 відповідно.
Встановлюючи ці порогові значення, ми можемо розділити наші сесії на «грубі» шматки так, що кожна з них буде представлена одним або кількома сегментами.
Повертаючись до теми розкладання по коробках: ми запаковуємо дані однієї сесії не обов’язково в одну коробку, але намагаємося мінімізувати їхню кількість — не більше кількох.
Гео-часовий індекс траєкторії
Після визначення сегментів важливо ефективно зберегти маршрут для кожного сегмента.
Після процесу нормалізації ми отримуємо траєкторію, що складається з послідовності комірок Bing Tile на рівні масштабування 22 та відповідних хвилинних інтервалів.
Оскільки ми проводимо транспонування набору рядків в один рядок, нам потрібно якось запакувати цю множину, що позначає траєкторію, в одну комірку-значення. Для цього можемо використати тип даних масив.
Перетворюємо набір записів із геочасовими даними в представлення масиву:
Далі виникає питання про те, як ефективно зберегти пару, що складається з комірки Bing Tile і мітки часу, нормалізованої до хвилин, у межах масиву. Найбільш прямолінійним підходом було б зберігати конкатенацію значень. Але є способи зробити це більш ефективно.
Оптимізація збереження траєкторії
Щоб усунути надлишкову інформацію та зменшити обсяг пам’яті, можна застосувати такі оптимізації.
Кожен сегмент визначається коміркою Bing Tile вищого рівня, яка охоплює весь маршрут сегменту. Таким чином, записуючи маршрут всередині сегменту, ми можемо опустити префікси, спільні для всього сегменту, і зберегти лише унікальні суфікси.
Така сама рекомендація стосується часу: оскільки кожен сегмент визначається годиною, ми можемо зберігати лише хвилинні числа в межах години.
Тоді можемо застосувати таку послідовність перетворень:
- Перетворити рядкове значення суфікса Bing Tile на ціле число, враховуючи, що воно використовує систему кодування нумерації за основою 4 за допомогою функцій SQL Athena from_base.
- Об’єднати два числа (суфікс комірки Bing Tile і номер хвилин), помноживши перше число на 100 і додавши друге число.
Виконавши ці оптимізації, ми можемо зберегти геочасовий маршрут як масив цілих значень:
SQL для індексування траєкторій
У наведеному нижче фрагменті коду SQL ілюструється процес побудови індексів траєкторії, які залежать від швидкості руху. Зверніть увагу на два ключові моменти:
- Точність комірки Bing Tile, яка представляє геоточку, зменшено з 22 до 21 для об’єктів, що швидко рухаються. Це коригування зумовлене тим, що швидкорухомі об’єкти, як-от потяги чи літаки, зазвичай перевищують розмір ~10 метрів. Тому логічно використовувати для порівняння геопросторові комірки, які щонайменше вдвічі більші.
- Нам вдається запакувати індекс траєкторії в масив
32-бітних цілих чисел для сеансів зі швидкостями до 60 км/год. Для швидших сеансів:- Створюємо первинний індекс траєкторії з незначною втратою деталей (щоб помістити в
32-бітний масив). Цього достатньо для більшості випадків. - Додаємо стовпець [trajectory_full_precision] для збереження повної точності траєкторії в масиві <bigint>.
- Створюємо первинний індекс траєкторії з незначною втратою деталей (щоб помістити в
CREATE EXTERNAL TABLE geolife_moving_session_partition( object_id string, session_id bigint, session_hour timestamp, covering_bt string, partition_duration smallint, trajectory_main array<int>, trajectory_full_precision array<bigint> ) STORED AS PARQUET LOCATION 's3://geotrajectories-blog/kaggle/geolife_moving_session_partition/' tblproperties ("parquet.compression"="SNAPPY"); insert into geolife_moving_session_partition with SpeedAdjustements as ( select object_id, session_id, ts_1min, date_trunc('hour', ts_1min) as session_hour, case when avg_speed_km_per_hour < 6 then substring(bt_22, 1, 12) when avg_speed_km_per_hour >= 6 and avg_speed_km_per_hour < 25 then substring(bt_22, 1, 10) when avg_speed_km_per_hour >= 25 and avg_speed_km_per_hour < 60 then substring(bt_22, 1, 9) when avg_speed_km_per_hour >= 60 and avg_speed_km_per_hour < 200 then substring(bt_22, 1, 7) when avg_speed_km_per_hour >= 200 then substring(bt_22, 1, 6) end as covering_bt, case when avg_speed_km_per_hour < 6 then from_base(substring(bt_22,13),4)*100+extract(minute from ts_1min) when avg_speed_km_per_hour >= 6 and avg_speed_km_per_hour < 25 then from_base(substring(bt_22,11),4)*100+extract(minute from ts_1min) when avg_speed_km_per_hour >= 25 then from_base(substring(bt_22,10,12),4)*100+extract(minute from ts_1min) end as geotemporal_point_main, case when avg_speed_km_per_hour >= 60 and avg_speed_km_per_hour < 200 then from_base(substring(bt_22,8,14),4)*100+extract(minute from ts_1min) when avg_speed_km_per_hour >= 200 then from_base(substring(bt_22,7,15),4)*100+extract(minute from ts_1min) end as geotemporal_point_full_precision from geolife_moving_sessions ) select object_id, session_id, session_hour, covering_bt, date_diff('minute', min(ts_1min), max(ts_1min))+1 as partition_duration, array_agg(cast(geotemporal_point_main as int)) as trajectory_main, array_agg(cast(geotemporal_point_full_precision as bigint)) filter (where geotemporal_point_full_precision is not null) as trajectory_full_precision from SpeedAdjustements group by object_id, session_id, session_hour, covering_bt
Пошук подорожуючих разом
Нарешті, зберігаючи дані, як описано вище, ми можемо ефективно аналізувати та виявляти об’єкти, що подорожують разом.
Для набору даних траєкторії Geolife, який відносно невеликий, можемо провести відповідне порівняння для всіх сеансів в одному запиті:
select sp1.object_id, sp2.object_id, count(distinct sp1.session_id) as distinct_sessions from geolife_moving_session_partition sp1 inner join geolife_moving_session_partition sp2 on sp1.covering_bt = sp2.covering_bt and sp1.session_hour = sp2.session_hour where sp1.object_id < sp2.object_id and cardinality(array_intersect(sp2.trajectory_main, sp1.trajectory_main)) > 10 group by sp1.object_id, sp2.object_id
Наведений вище запит виконує порівняння всіх сегментів між собою, перевіряючи збіги на основі двох атрибутів, які визначають межі кожного розділу:
- Комірка покриття Bing Tile, яка містить усі геоточки в розділі.
- Година сеансу, яка діє як часова межа розділу.
Запит визначає кількість спільних геочасових точок у сегментах за допомогою функції cardinality. Ми вибираємо пари, які мають щонайменше 10 спільних геочасових точок, що вказує на те, що об’єкти подорожували разом принаймні 10 хвилин.
Підсумки
У статті описується методологія зберігання індексів траєкторії як масивів геочасових точок у цілочисельному форматі, що забезпечує гнучке, ефективне та масштабоване рішення проблеми пошуку об’єктів, що подорожують разом.
Основні компоненти цього рішення включають:
- Amazon Athena — сервіс із надійною підтримкою геопросторових функцій.
- Парадигма «Мислення множинами» для нормалізації та дискретизації геоточок.
- Використання рекурсивного індексування Bing Tile на основі геопросторової сітки.
- Використання типу даних масивів і операцій з масивами.
- Ретельно розроблена модель даних.
На основі методології цих інструментів було розроблено спеціальну систему індексування геочасових траєкторій.
Додаток
Якщо ви хочете використовувати наведені вище запити для демо даних geolife, потрібно виконати такі дії:
- Завантажити звідси зразки даних, які зберігаються як три великі файли CSV.
- Завантажити ці файли в папку S3.
- Завантажити дані у вхідну таблицю за допомогою наведеного нижче фрагмента коду, замінивши шлях S3 s3://geotrajectories-blog/kaggle-sample/ на той, який ви використовували для завантаження демо-даних.
CREATE EXTERNAL TABLE IF NOT EXISTS geolife_input_csv ( object_id string, lat float, long float, ts timestamp ) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimple WITH SERDEPROPERTIES ( ' serialization.format ' = ', ', ' field.delim ' = ', ' ) LOCATION ' s3 :// geotrajectories - blog / kaggle - sample / ' TBLPROPERTIES (' has_encrypted_data '=' false ', ' skip.header.line.17 S c CREATE EXTERNAL TABLE geolife_input( object_id string, lat float, long float, ts timestamp ) STORED AS PARQUET LOCATION 's3://geotrajectories-blog/geolife_input/' tblproperties ("parquet.compression" = "SNAPPY"); insert into geolife_input select * from geolife_input_csv;
6 коментарів
Додати коментар Підписатись на коментаріВідписатись від коментарів