Materiál k cvičení DBM1 týden #7, sezona 2024/2025

Zpracování dat ve formě snapshotů aktuálního stavu

Dnes se budeme zabývat specifickým typem dat. Uvažujme veřejně dostupná data např. žebříčku nejlepších hráčů v nějaké hře, obsahující aktuální data. Tato data se ze své podstaty každý moment mohou změnit. Pokud s nimi chceme pracovat, jednou z variant je pravidelně data stahovat a ukládat ve formě snapshotů s časovým razítkem.

Taková data je možná analyzovat z pohledu vývoje, sledovat změny v čase, nebo např. počítat aktivitu hráčů.

📄 Stormgate leaderboard snapshoty

Data byla stažena z veřejného API a obsahují půlnoční snapshoty tabulky top 500 hráčů.

Načtení dat

Prvním krokem je načtení dat ze všech souborů. Skript níže nalezne všechny soubory leaderboard-00001.json v zadaném adresáři. Nejprve jednotlivé řádky uloží do seznamu a následně vytvoří DataFrame.

import pandas as pd import json from pathlib import Path # Path to the root folder shown in your screenshot root_path = Path("data/SG S1 data") # Recursively find all leaderboard-00001.json files json_files = list(root_path.rglob("leaderboard-00001.json")) # Collect all rows from all snapshots records = [] for file in json_files: with open(file, encoding='utf-8') as f: content = json.load(f) snapshot_date = content["meta"]["dateCrawled"] for row in content["data"]: row["snapshot_date"] = snapshot_date records.append(row) # Convert to DataFrame df = pd.DataFrame(records) print(df.head())

Všimněte si, že jsou použity všechny údaje z klíče data a následně je přidán sloupec snapshot_date s časovým razítkem získaným v klíči meta.dateCrawled.

Q1 jaký význam má momentálně jedna řádka v DataFrame?

Tabulka faktů a dimenzí

Nyní sestavíme tabulku faktů (záznamy z žebríčku).

# STEP 1: Prepare base DataFrame # technically not needed, if kept in order during loading df['snapshot_date'] = pd.to_datetime(df['snapshot_date']) df = df.sort_values(['snapshot_date', 'points'], ascending=[True, False]) # STEP 2: Generate rank per snapshot df['rank'] = df.groupby('snapshot_date')['points'].rank(method='first', ascending=False).astype(int) # STEP 3: Build FACT TABLE fact_cols = ['profileId', 'snapshot_date', 'rank', 'points', 'wins', 'losses', 'ties', 'mmr', 'tier', 'league', 'race'] fact_df = df[fact_cols].copy() print(fact_df.head())

Během sestavování tabulky faktů byl vytvořen nový sloupec rank, který určuje pořadí hráčů v daném snapshotu. Tato informace je implicitně obsažena v datech (pořadí záznamů v JSON Array), ale není explicitně v žádném klíči.

Použití rank(method='first') zajistí, že pokud dva hráči mají stejný počet bodů, bude mít lepší hodnotu ranku ten, který byl načten dříve.

Q2 Co by bylo považováno za primární klíč tabulky faktů?

Jako dimenzi můžeme vystavět data o hráčích a jejich jménech. Jelikož se jména mohou měnit (profileId je konstantní), budeme udržovat i historii změn.

# STEP 4.1: Subset relevant fields name_df = df[['profileId', 'playerName', 'snapshot_date']].copy() # STEP 4.2: Drop duplicates in time order (to track name transitions) name_df = name_df.sort_values(['profileId', 'snapshot_date']) name_df = name_df.drop_duplicates(['profileId', 'playerName'], keep='first') # STEP 4.3: Add valid_from and valid_to columns name_df['valid_from'] = name_df['snapshot_date'] name_df['valid_to'] = name_df.groupby('profileId')['valid_from'].shift(-1) name_df['current_flag'] = name_df['valid_to'].isna() # Reorder players_dim = name_df[['profileId', 'playerName', 'valid_from', 'valid_to', 'current_flag']] changed_names = players_dim[~players_dim['current_flag']] print(changed_names.head())

Nejprve si půjčíme pouze relevantní sloupce a seřadíme je podle profileId a snapshot_date. Následně odstraněním duplikátů a zachováním pouze prvních záznamů v čase získáme informaci o tom, kdy prvně bylo zaznamenáno příslušné jméno.

Zavedeme sloupce valid_from a valid_to, které určují platnost jména v daném časovém období. Sloupec current_flag označuje aktuální jméno.

V kódu je použit "trik" s shift(-1), který vytvoří posunutou kopii sloupce valid_from o jeden řádek dolů. Tím získáme valid_to pro každý záznam, pokud je novější jméno.

Pro ověření správnosti vypisuji prvních pár řádků, kde došlo ke změně jména.

Nuance: Předpokládá se, že nedojde k opakování historického jména. V takovém případě by bylo nutné významně upravit kód/logiku.

Typy pomalu měnících se dimenzí

Pomalu měnící se dimenze (SCD, slowly changing dimensions) jsou dimenze, které se mění v čase. V tomto příkladu se jedná o tabulku name_df, kde se může měnit jméno libovolného hráče v nepravidelných intervalech. SDC se dělí na 4 (až 6) typy dle zvolené strategie:

V našem případě je implementován Type 2 pomocí sloupců valid_from, valid_to a current_flag.

Q3 Jak byste implementovali Type 1 SCD pro změny jména hráče?

Q4 Jakou ne/výhodu má naše implementace? Jaké extra možnosti toto uchování historie přináší?

Výpočet změn mezi snapshoty

Nyní vyřešíme to zajímavé u snapshot dat - jak propočítat změny mezi jednotlivými časovými body a jak informaci uchovat.

# Make sure it's sorted by player, race, and snapshot_date fact_df = fact_df.sort_values(['profileId', 'race', 'snapshot_date']) fact_df['games_played'] = fact_df[['wins', 'losses', 'ties']].sum(axis=1) # Lag metrics per player and race fact_df['prev_points'] = fact_df.groupby(['profileId', 'race'])['points'].shift(1) fact_df['prev_mmr'] = fact_df.groupby(['profileId', 'race'])['mmr'].shift(1) fact_df['prev_games'] = fact_df.groupby(['profileId', 'race'])['games_played'].shift(1) fact_df['prev_snapshot'] = fact_df.groupby(['profileId', 'race'])['snapshot_date'].shift(1) fact_df['next_snapshot'] = fact_df.groupby(['profileId', 'race'])['snapshot_date'].shift(-1) # Now calculate the difference in metrics (points, mmr, etc.) fact_df['points_diff'] = fact_df['points'] - fact_df['prev_points'] fact_df['mmr_diff'] = fact_df['mmr'] - fact_df['prev_mmr'] fact_df['games_played_diff'] = fact_df['games_played'] - fact_df['prev_games'] max_date = fact_df['snapshot_date'].max() # Detect new players fact_df['is_new'] = fact_df['prev_snapshot'].isna() | ( (fact_df['snapshot_date'] - fact_df['prev_snapshot']).dt.days > 1 ) # Detect dropouts fact_df['is_dropout'] = ( (fact_df['next_snapshot'].notna() & (fact_df['next_snapshot'] - fact_df['snapshot_date']).dt.days > 1) | (fact_df['next_snapshot'].isna() & (fact_df['snapshot_date'] != max_date)) )

V kódu nejprve využijeme shift(1) a shift(-1) pro vytvoření nových sloupců, které obsahují hodnoty z předchozího a následujícího snapshotu. Následně vypočítáme rozdíly mezi jednotlivými metrikami.

Podobná fuknce jako shift je dostupná i v SQL pomocí window funkcí (např. LAG).

Komplikovanější je řešení výpočtu, zda se jedná o nového hráče v žebříčku, případně jestli je to poslední působení hráče v žebříčku. Zde je použita logika na základě rozdílu v čase mezi snapshoty (jsou denní, tedy rozdíl dnů > 1) a dle neexistence následujícího nebo předchozícho snapshotu.

Dle charakteristiky dat jsou některé výpočty stabilní, tedy s novými daty neměnné a některé se mění (next_snapshot, is_dropout).

Samotnou otázkou k zamyšlení je, zda odvozená data přidávat do tabulky faktů, nebo je udržovat v jiné tabulce (např. historie změn, aktivity, či příchodu/odchodu hráčů).

Q5 Co by znamenalo, kdyby games_played_diff bylo záporné? Jaké situace by to mohlo reprezentovat?

Vlastní dimenzní tabulky

Při modelování typicky dochází k přidání další informace o dimenzích, které nepocházejí z původního zdroje dat. Například v našem případě lze dodat dimenzi obsahující informaci o tom, kdy byl ve hře nasazen balance patch. To by mohlo při analýze aktivity hráčů v různých obdobích být významné, ale není to informace, kterou bychom mohli získat z původních dat.

Odvozená fakta, denormalizace

Formálně se v klasických produkčních relačních databázích hovoří o normálních formách, které se snaží o minimalizaci redundance dat. V analytických databázích (data warehouse) je naopak častá denormalizace, kdy se data z různých tabulek spojí do jedné pro zjednodušení analýzy.

V našem případě jsme zavedli položky jako rank nebo games_played, které by bylo možné dopočítat z původních dat. Pro analýzu je ale často výhodné mít tato data přímo k dispozici bez nutnosti jejich opakovaného výpočtu. Obzvláště pokud se jedná o složitější výpočty, nebo pokud chceme data používat v jiných prostředích, které takové výpočty triviálně nepodporují.

V původních datech již také byla redundance zavedená - league a tier jsou odvozené od points. Nicméně samotný datový endpoint to takto předává pro zjednodušení zpracování.

Sekundární výpočty rozdílů metrik zase mohou sloužit při analýzách jako vlastní dimenze. Například můžeme filtrovat nové hráče, nebo aktivní hráče v daný den.

Dimenze pro rozsahy intervalů

Poměrně často se při analýzách pracuje s hraničními číselnými hodnotami, které rozdělují obor hodnot na významové intervaly. V řešeném případě se toto vyskytuje přirozeně v převodu bodového hodnocení points na ligové úrovně league + tier.

Další situací by mohlo být například kategorizování aktivity hráčů do kohort podle počtu odehraných her v jeden den. Tedy například hráči s 0-5 her, 6-10 her, 10-20 her, 20+ her v jeden den.

V takových případech je vhodné vytvořit dimenzi, která bude obsahovat definici intervalů a případně i popisný název. Tato dimenze se následně propojí s faktovou tabulkou.

Můžeme zkusit vytvořit takovou dimenzi pro league a tier na základě dat:

rank_dim = fact_df.groupby(['league', 'tier'])['points'].agg(['min', 'max']).reset_index() rank_dim = rank_dim.sort_values(['max']) print(rank_dim)

Nicméně po prozkoumání vidíme, že jsme nezískali přesné hranice kvůli nedostatku dat. Například přechod mezi Master1 a Master2 je nejasný. Bylo by tedy potřeba manuálně doplnit hodnoty, nebo použít jiný přístup. Zároveň maximální hodnota (Master1) by měla obsahovat nekonečně vysokou horní hranici, protože je to otevřený systém.

Výběr intervalu stahování snapshotů

U tohoto use-case, resp. typu zpracování dat není triviální určit, jak často stahovat snapshoty. Zde je několik otázek k zamyšlení:

Kromě toho je také potřeba zvážit i kontextualizaci dat. Například pokud bychom chtěli sledovat denní aktivitu hráčů ve hře, která má globální charakter, je potřeba vzít v úvahu časové pásmo a případně i denní cyklus aktivity hráčů.

Q6 Pokud bychom chtěli dělat přehledy o aktivitě v jednotlivých dnech, v jaký čas byste stahovali snapshoty? Na základě čeho byste určili vhodný čas "přetočení" dne?

Zároveň je vhodné zvážit, zda se nechovat defenzivně a nestahovat více, než je potřeba. Například pokud nás zajímá denní aktivita, můžeme pro jistotu stáhnout data ještě hodinu před a po standardním čase přetočení dne. Tím máme alternativní blízký datapoint, pokud by došlo k výpadku API, nebo k chybě v datech.

Q7 Zkomplikovalo by častější nebo nepravidelné stahování dat nějakým způsobem analýzu, která zde byla naznačena?