Materiál k cvičení DBM1 týden #7, sezona 2024/2025
Dnes se budeme zabývat specifickým typem dat. Uvažujme veřejně dostupná data např. žebříčku nejlepších hráčů v nějaké hře, obsahující aktuální data. Tato data se ze své podstaty každý moment mohou změnit. Pokud s nimi chceme pracovat, jednou z variant je pravidelně data stahovat a ukládat ve formě snapshotů s časovým razítkem.
Taková data je možná analyzovat z pohledu vývoje, sledovat změny v čase, nebo např. počítat aktivitu hráčů.
📄 Stormgate leaderboard snapshoty
Data byla stažena z veřejného API a obsahují půlnoční snapshoty tabulky top 500 hráčů.
Prvním krokem je načtení dat ze všech souborů. Skript níže nalezne všechny soubory leaderboard-00001.json v zadaném adresáři. Nejprve jednotlivé řádky uloží do seznamu a následně vytvoří DataFrame.
import pandas as pd
import json
from pathlib import Path
# Path to the root folder shown in your screenshot
root_path = Path("data/SG S1 data")
# Recursively find all leaderboard-00001.json files
json_files = list(root_path.rglob("leaderboard-00001.json"))
# Collect all rows from all snapshots
records = []
for file in json_files:
with open(file, encoding='utf-8') as f:
content = json.load(f)
snapshot_date = content["meta"]["dateCrawled"]
for row in content["data"]:
row["snapshot_date"] = snapshot_date
records.append(row)
# Convert to DataFrame
df = pd.DataFrame(records)
print(df.head())
Všimněte si, že jsou použity všechny údaje z klíče data a následně je přidán sloupec snapshot_date s časovým razítkem získaným v klíči meta.dateCrawled.
Q1 jaký význam má momentálně jedna řádka v DataFrame?
Nyní sestavíme tabulku faktů (záznamy z žebríčku).
# STEP 1: Prepare base DataFrame
# technically not needed, if kept in order during loading
df['snapshot_date'] = pd.to_datetime(df['snapshot_date'])
df = df.sort_values(['snapshot_date', 'points'], ascending=[True, False])
# STEP 2: Generate rank per snapshot
df['rank'] = df.groupby('snapshot_date')['points'].rank(method='first', ascending=False).astype(int)
# STEP 3: Build FACT TABLE
fact_cols = ['profileId', 'snapshot_date', 'rank', 'points', 'wins', 'losses', 'ties', 'mmr', 'tier', 'league', 'race']
fact_df = df[fact_cols].copy()
print(fact_df.head())
Během sestavování tabulky faktů byl vytvořen nový sloupec rank, který určuje pořadí hráčů v daném snapshotu. Tato informace je implicitně obsažena v datech (pořadí záznamů v JSON Array), ale není explicitně v žádném klíči.
Použití rank(method='first') zajistí, že pokud dva hráči mají stejný počet bodů, bude mít lepší hodnotu ranku ten, který byl načten dříve.
Q2 Co by bylo považováno za primární klíč tabulky faktů?
Jako dimenzi můžeme vystavět data o hráčích a jejich jménech. Jelikož se jména mohou měnit (profileId je konstantní), budeme udržovat i historii změn.
# STEP 4.1: Subset relevant fields
name_df = df[['profileId', 'playerName', 'snapshot_date']].copy()
# STEP 4.2: Drop duplicates in time order (to track name transitions)
name_df = name_df.sort_values(['profileId', 'snapshot_date'])
name_df = name_df.drop_duplicates(['profileId', 'playerName'], keep='first')
# STEP 4.3: Add valid_from and valid_to columns
name_df['valid_from'] = name_df['snapshot_date']
name_df['valid_to'] = name_df.groupby('profileId')['valid_from'].shift(-1)
name_df['current_flag'] = name_df['valid_to'].isna()
# Reorder
players_dim = name_df[['profileId', 'playerName', 'valid_from', 'valid_to', 'current_flag']]
changed_names = players_dim[~players_dim['current_flag']]
print(changed_names.head())
Nejprve si půjčíme pouze relevantní sloupce a seřadíme je podle profileId a snapshot_date. Následně odstraněním duplikátů a zachováním pouze prvních záznamů v čase získáme informaci o tom, kdy prvně bylo zaznamenáno příslušné jméno.
Zavedeme sloupce valid_from a valid_to, které určují platnost jména v daném časovém období. Sloupec current_flag označuje aktuální jméno.
V kódu je použit "trik" s shift(-1), který vytvoří posunutou kopii sloupce valid_from o jeden řádek dolů. Tím získáme valid_to pro každý záznam, pokud je novější jméno.
Pro ověření správnosti vypisuji prvních pár řádků, kde došlo ke změně jména.
⚠ Nuance: Předpokládá se, že nedojde k opakování historického jména. V takovém případě by bylo nutné významně upravit kód/logiku.
Pomalu měnící se dimenze (SCD, slowly changing dimensions) jsou dimenze, které se mění v čase. V tomto příkladu se jedná o tabulku name_df, kde se může měnit jméno libovolného hráče v nepravidelných intervalech. SDC se dělí na 4 (až 6) typy dle zvolené strategie:
V našem případě je implementován Type 2 pomocí sloupců valid_from, valid_to a current_flag.
Q3 Jak byste implementovali Type 1 SCD pro změny jména hráče?
Q4 Jakou ne/výhodu má naše implementace? Jaké extra možnosti toto uchování historie přináší?
Nyní vyřešíme to zajímavé u snapshot dat - jak propočítat změny mezi jednotlivými časovými body a jak informaci uchovat.
# Make sure it's sorted by player, race, and snapshot_date
fact_df = fact_df.sort_values(['profileId', 'race', 'snapshot_date'])
fact_df['games_played'] = fact_df[['wins', 'losses', 'ties']].sum(axis=1)
# Lag metrics per player and race
fact_df['prev_points'] = fact_df.groupby(['profileId', 'race'])['points'].shift(1)
fact_df['prev_mmr'] = fact_df.groupby(['profileId', 'race'])['mmr'].shift(1)
fact_df['prev_games'] = fact_df.groupby(['profileId', 'race'])['games_played'].shift(1)
fact_df['prev_snapshot'] = fact_df.groupby(['profileId', 'race'])['snapshot_date'].shift(1)
fact_df['next_snapshot'] = fact_df.groupby(['profileId', 'race'])['snapshot_date'].shift(-1)
# Now calculate the difference in metrics (points, mmr, etc.)
fact_df['points_diff'] = fact_df['points'] - fact_df['prev_points']
fact_df['mmr_diff'] = fact_df['mmr'] - fact_df['prev_mmr']
fact_df['games_played_diff'] = fact_df['games_played'] - fact_df['prev_games']
max_date = fact_df['snapshot_date'].max()
# Detect new players
fact_df['is_new'] = fact_df['prev_snapshot'].isna() | (
(fact_df['snapshot_date'] - fact_df['prev_snapshot']).dt.days > 1
)
# Detect dropouts
fact_df['is_dropout'] = (
(fact_df['next_snapshot'].notna() &
(fact_df['next_snapshot'] - fact_df['snapshot_date']).dt.days > 1) |
(fact_df['next_snapshot'].isna() & (fact_df['snapshot_date'] != max_date))
)
V kódu nejprve využijeme shift(1) a shift(-1) pro vytvoření nových sloupců, které obsahují hodnoty z předchozího a následujícího snapshotu. Následně vypočítáme rozdíly mezi jednotlivými metrikami.
※ Podobná fuknce jako shift je dostupná i v SQL pomocí window funkcí (např. LAG).
Komplikovanější je řešení výpočtu, zda se jedná o nového hráče v žebříčku, případně jestli je to poslední působení hráče v žebříčku. Zde je použita logika na základě rozdílu v čase mezi snapshoty (jsou denní, tedy rozdíl dnů > 1) a dle neexistence následujícího nebo předchozícho snapshotu.
Dle charakteristiky dat jsou některé výpočty stabilní, tedy s novými daty neměnné a některé se mění (next_snapshot, is_dropout).
Samotnou otázkou k zamyšlení je, zda odvozená data přidávat do tabulky faktů, nebo je udržovat v jiné tabulce (např. historie změn, aktivity, či příchodu/odchodu hráčů).
Q5 Co by znamenalo, kdyby
games_played_diffbylo záporné? Jaké situace by to mohlo reprezentovat?
Při modelování typicky dochází k přidání další informace o dimenzích, které nepocházejí z původního zdroje dat. Například v našem případě lze dodat dimenzi obsahující informaci o tom, kdy byl ve hře nasazen balance patch. To by mohlo při analýze aktivity hráčů v různých obdobích být významné, ale není to informace, kterou bychom mohli získat z původních dat.
Formálně se v klasických produkčních relačních databázích hovoří o normálních formách, které se snaží o minimalizaci redundance dat. V analytických databázích (data warehouse) je naopak častá denormalizace, kdy se data z různých tabulek spojí do jedné pro zjednodušení analýzy.
V našem případě jsme zavedli položky jako rank nebo games_played, které by bylo možné dopočítat z původních dat. Pro analýzu je ale často výhodné mít tato data přímo k dispozici bez nutnosti jejich opakovaného výpočtu. Obzvláště pokud se jedná o složitější výpočty, nebo pokud chceme data používat v jiných prostředích, které takové výpočty triviálně nepodporují.
V původních datech již také byla redundance zavedená - league a tier jsou odvozené od points. Nicméně samotný datový endpoint to takto předává pro zjednodušení zpracování.
Sekundární výpočty rozdílů metrik zase mohou sloužit při analýzách jako vlastní dimenze. Například můžeme filtrovat nové hráče, nebo aktivní hráče v daný den.
Poměrně často se při analýzách pracuje s hraničními číselnými hodnotami, které rozdělují obor hodnot na významové intervaly. V řešeném případě se toto vyskytuje přirozeně v převodu bodového hodnocení points na ligové úrovně league + tier.
Další situací by mohlo být například kategorizování aktivity hráčů do kohort podle počtu odehraných her v jeden den. Tedy například hráči s 0-5 her, 6-10 her, 10-20 her, 20+ her v jeden den.
V takových případech je vhodné vytvořit dimenzi, která bude obsahovat definici intervalů a případně i popisný název. Tato dimenze se následně propojí s faktovou tabulkou.
Můžeme zkusit vytvořit takovou dimenzi pro league a tier na základě dat:
rank_dim = fact_df.groupby(['league', 'tier'])['points'].agg(['min', 'max']).reset_index()
rank_dim = rank_dim.sort_values(['max'])
print(rank_dim)
Nicméně po prozkoumání vidíme, že jsme nezískali přesné hranice kvůli nedostatku dat. Například přechod mezi Master1 a Master2 je nejasný. Bylo by tedy potřeba manuálně doplnit hodnoty, nebo použít jiný přístup. Zároveň maximální hodnota (Master1) by měla obsahovat nekonečně vysokou horní hranici, protože je to otevřený systém.
U tohoto use-case, resp. typu zpracování dat není triviální určit, jak často stahovat snapshoty. Zde je několik otázek k zamyšlení:
Kromě toho je také potřeba zvážit i kontextualizaci dat. Například pokud bychom chtěli sledovat denní aktivitu hráčů ve hře, která má globální charakter, je potřeba vzít v úvahu časové pásmo a případně i denní cyklus aktivity hráčů.
Q6 Pokud bychom chtěli dělat přehledy o aktivitě v jednotlivých dnech, v jaký čas byste stahovali snapshoty? Na základě čeho byste určili vhodný čas "přetočení" dne?
Zároveň je vhodné zvážit, zda se nechovat defenzivně a nestahovat více, než je potřeba. Například pokud nás zajímá denní aktivita, můžeme pro jistotu stáhnout data ještě hodinu před a po standardním čase přetočení dne. Tím máme alternativní blízký datapoint, pokud by došlo k výpadku API, nebo k chybě v datech.
Q7 Zkomplikovalo by častější nebo nepravidelné stahování dat nějakým způsobem analýzu, která zde byla naznačena?