Keeping data loading seperate. Normal people want stats, fewer people want the "how did you get this" If you want to see how this used to happen, or see data for earlier years, see the old Data Loading Notebook.
Under the covers, this is mostly requests to fetch data, which is now json, so we can use the internal python library, and Pandas for data munging. So let's start with the imports.
%matplotlib inline
from IPython.display import display, HTML
import requests
import json
import pandas as pd
import numpy as np
from datetime import date, datetime, time
from os import path, mkdir
import re
When we can, we'll cache data. This is only partially for speed. Checking in the data allows for repeatability if sources go away or change. And for some partial results, its useful to publish data. There are others in the XPN community doing data analysis or just asking questions, so csv files are nice to leave around.
cache_dir = './cache'
playlist_cache_dir = path.join(cache_dir, 'playlists')
a2z90s_cache_dir = path.join(cache_dir, 'a2z90s')
bestwomen_cache_dir = path.join(cache_dir, 'bestwomen')
musicbrainz_cache_dir = path.join(cache_dir, 'musicbrainz')
data_dir = './data'
for d in (cache_dir, playlist_cache_dir, a2z90s_cache_dir, bestwomen_cache_dir, data_dir, musicbrainz_cache_dir):
if not path.exists(d): mkdir(d)
def fetch_daily_playlist(day, cache_dir=None, verbose = False):
"""
Fetches the XPN playlist for a given date
Args:
day (datetime.date) : The day to fetch the playlist for
cache_dir (string) : Path to the cache directory, or None to avoid caching
Returns:
DataFrame containing Artist, Title, and Album as Strings and Airtime as Timestamp
"""
songs = pd.DataFrame(None, columns=['Artist', 'Title', 'Album', 'Air Time'])
if cache_dir is not None:
cache_file = path.join(cache_dir, "%04d-%02d-%02d.csv" % \
(day.year, day.month, day.day))
if cache_file is not None and path.exists(cache_file):
songs = pd.read_csv(cache_file, encoding='utf-8')
songs['Air Time'] = pd.to_datetime(songs['Air Time'], errors='coerce')
if verbose: print "Got %d rows from %s" % (len(songs), cache_file)
else:
# example url
# https://origin.xpn.org/utils/playlist/json/2022-11-30.json
playlist_url = 'https://origin.xpn.org/utils/playlist/json/%s.json' %(day.isoformat())
page = requests.get(playlist_url)
if verbose: print "fetching %s returned status %s" % (day.isoformat(), page.status_code)
# return is a json array of playlist entries
# each playlist entry is a dict of
# - artist : artists name
# - song : song title
# - album : album title
# - timeslice : string containing an iso date with second granularity in Eastern time
# - image: url of album cover art
# - streamPreview : url of a short mp3 outtake from the track
#
# Not all track are music. But shows like World Cafe or Echos
# put the show name, bound by vertical bars, as the artist, so we can skip them easily enough
track_count = 0
for track in page.json():
if track["artist"][0] == '|':
# skip non-song show titles
continue
if verbose: print ("adding %s %s %s %s" % (track['artist'], track['song'], track['album'],
datetime.strptime(track['timeslice'],'%Y-%m-%d %H:%M:%S')))
songs = songs.append({'Artist': track['artist'],
'Title': track['song'],
'Album': track['album'],
'Air Time': datetime.strptime(track['timeslice'],'%Y-%m-%d %H:%M:%S')},
ignore_index = True)
if verbose: print "size = %d" % len(songs)
track_count += 1
if verbose: print 'added %d tracks' % (track_count)
if cache_file is not None:
songs.to_csv(cache_file, index=False, encoding='utf-8')
if verbose: print 'write %d rows to %s' % (len(songs), cache_file)
return songs
def fetch_playlist(start, end, cache_dir=None):
"""
Fetch all the playlist entries for a range of time.
Args:
start (datetime.datetime) : The inclusive start time to fetch entries for
end (datetime.datetime) : The exclusive end time to fetch entries for
cache_dir (string) : path to the cache directory, or None to avoid caching
Returns:
Dataframe containing Artist, Title, and Album as strings, and Airtime as timestamp
"""
songs = pd.DataFrame(None, columns=['Artist', 'Title', 'Album', 'Air Time'])
for day in pd.date_range(start.date(), end.date()):
songs = songs.append(fetch_daily_playlist(day.date(), cache_dir), ignore_index=True)
songs = songs[songs['Air Time'] >= start]
songs = songs[songs['Air Time'] < end]
# sometimes the playlist entries are duplicated
songs = songs.drop_duplicates(subset=['Artist', 'Title', 'Album'])
songs = songs.sort_values(by = 'Air Time')
return songs
Fetch all the playlists, for the duration of the countdown, pulling from local cache if possible.
# this is slightly awkward. Since the playlist stops in the evening and restarts in
# the morning, we can kind of treat it as multiple playlist.
women_day1 = fetch_playlist(datetime(2023, 12, 5, 8, 0), datetime(2023, 12,5,17,19), playlist_cache_dir)
women_day2 = fetch_playlist(datetime(2023, 12, 6, 8, 0), datetime(2023, 12,6,17,53), playlist_cache_dir)
women_day3 = fetch_playlist(datetime(2023, 12, 7, 8, 0), datetime(2023, 12,7,17,32), playlist_cache_dir)
women_day4 = fetch_playlist(datetime(2023, 12, 8, 8, 0), datetime(2023, 12,8,16,0), playlist_cache_dir)
women_day5 = fetch_playlist(datetime(2023, 12, 9, 10, 0), datetime(2023,12,9,18,0), playlist_cache_dir)
women_day6 = fetch_playlist(datetime(2023, 12, 10, 11, 0), datetime(2023,12, 10,15,0 ), playlist_cache_dir)
women_day7 = fetch_playlist(datetime(2023, 12, 11, 8, 0), datetime(2023, 12, 11, 17, 25), playlist_cache_dir)
women_day8 = fetch_playlist(datetime(2023, 12, 12, 8, 0), datetime(2023,12,12,17,54), playlist_cache_dir)
women_days = [women_day1, women_day2, women_day3, women_day4, women_day5, women_day6, women_day7, women_day8]
print "got %d rows" % reduce(lambda x,y: x + len(y), women_days, 0)
HTML(women_day1.head(5).to_html())
HTML(women_day8.tail(5).to_html())
# ninties = fetch_playlist(datetime(2022, 12, 1, 8, 0), datetime(2022, 12, 8, 8, 30),
# playlist_cache_dir)
# print "got %d rows" % len(ninties)
# ninties_extras = fetch_playlist(datetime(2022, 12, 8, 8, 30), datetime(2022, 12, 8, 11, 8),
# playlist_cache_dir)
# print "got %d rows" % len(ninties_extras)
Since we have air times, we can approximate durations by subtracting the air time from the next track's air times. There are a couple catches with this
def estimate_durations(playlist, end_time=None):
"""
Estimate the song durations
Args:
playlist (DataFrame): playlist with minimally an 'Air Time' attribute
end_time (datetime): end time of the play list, or None if still going
Return:
modified DataFrame with 'Duration' attribute added.
"""
playlist['Duration'] = pd.Series([0 for x in range(len(playlist.index))], index=playlist.index)
previous = None
last_idx = None
for idx, row in playlist.iterrows():
if not previous is None:
if row['Air Time'].date().weekday() == 4 and previous.hour == 11 and row['Air Time'].hour == 12:
# We just fell into a free at noon
playlist.loc[last_idx, 'Duration'] = 60 - previous.minute
else:
# just subtract this start from the previous
delta = row['Air Time'] - previous
playlist.loc[last_idx, 'Duration'] = delta.seconds / 60
previous = row['Air Time']
last_idx = idx
# fixup the last row
if end_time is not None:
delta = end_time - playlist.loc[last_idx,'Air Time']
playlist.loc[last_idx, 'Duration'] = delta.seconds / 60
return playlist
# ninties = estimate_durations(ninties, datetime(2022,12, 8, 8, 30))
# ninties_extras = estimate_durations(ninties_extras, datetime(2022, 12, 8, 11, 9))
women_day1 = estimate_durations(women_day1, datetime(2023,12,5, 17, 19))
women_day2 = estimate_durations(women_day2, datetime(2023, 12, 6, 17, 53))
women_day3 = estimate_durations(women_day3, datetime(2023, 12, 7, 17, 32))
women_day4 = estimate_durations(women_day4, datetime(2023, 12, 8, 16, 0))
women_day5 = estimate_durations(women_day5, datetime(2023, 12, 9, 18, 0))
women_day6 = estimate_durations(women_day6, datetime(2023, 12,10, 15, 0))
women_day7 = estimate_durations(women_day7, datetime(2023, 12, 11, 17, 25))
women_day8 = estimate_durations(women_day8, datetime(2023,12,12,17,54))
women = pd.concat([women_day1, women_day2, women_day3, women_day4, women_day5, women_day6, women_day7, women_day8], ignore_index=True)
For countdowns it's useful to have a colum for postition in the countdow. So, for example, in the 885 Greatest Songs by women, the first track is 885, and the last is number 1.
def assign_positions(playlist, size):
"""
Assign postions to songs in a playlist
Args:
playlist (DataFrame): playlist with rows in order of Air Time
size (int): eventual size of the playlist
Return:
modified playlist with 'Position" appended
"""
return playlist.assign(Position=range(size, size -len(playlist), -1))
women = assign_positions(women, 885)
This might be the lamest, simplest, data augmentation, but these are mostly A to Z countdows. Besides nothing is ever really that simple. Blanks and initial punctuation (ex 'Til) have show up before.
def first_char(s):
for c in s:
if type(c) is str and c.isalpha():
return c.upper()
return s[0]
# ninties = ninties.join(ninties.apply(lambda x: first_char(x['Title']), axis=1).to_frame('Letter'))
women = women.join(women.apply(lambda x: first_char(x['Title']), axis=1).to_frame('Letter'))
For the non-alphabetic leftovers, we'll do first character instead, so no skipping past non-alphabetics.
# ninties_extras = ninties_extras.join(ninties_extras.apply(lambda x: x[1][0].upper(), axis=1).to_frame('First Character'))
Not sure how interesting this is, but the "should we include leading articles" was the genesis of this effort back in 2016. Besides it's easy.
from nltk.tokenize import RegexpTokenizer
custom_tokenize = RegexpTokenizer("[\w'\-]+|[^\w'\s\-]").tokenize
# ninties = ninties.join(ninties.apply(lambda x: custom_tokenize(x['Title'])[0], axis=1).to_frame('First Word'))
# ninties_extras = ninties_extras.join(ninties_extras.apply(lambda x: custom_tokenize(x['Title'])[0], axis=1).to_frame('First Word'))
women = women.join(women.apply(lambda x: custom_tokenize(x['Title'])[0], axis=1).to_frame('First Word'))
For the 90s A-Z, the realtime list uses a single Json request to get a list of songs and it contains the publicatin year. this might just be a lot easier than using MusicBrainz. For the 885 Songs by Women, we'll need to use Music Brainz below.
def fetch_years():
# the 90s specific playlist page has a json feed with years,
# so we can just fetch that as
# https://origin.xpn.org/countdown/2022/2022_90s_az.json
# one catch, unlike the normal playlist, it stuffs esacpped esapes on "specials"
# for example "Don\\'t need no \\'cape here" rather than "Don't need no 'cape here"
#
# their data isn't much better than what I did myself.
# for the moment, drop anything outside 1990-1999.
years = pd.DataFrame(None, columns=['Artist', 'Title', 'Album', 'Year'])
az_url = 'https://origin.xpn.org/countdown/2022/2022_90s_az.json'
page = requests.get(az_url)
for track in page.json():
if len(track['releaseDate']) == 4:
release_year = int(track['releaseDate'])
if release_year < 1990 or release_year > 1999:
release_year = 0
else:
release_year = 0
years = years.append({'Artist': track['artist'].replace("\\", ""),
'Title': track['song'].replace("\\", ""),
'Album': track['album'].replace("\\", ""),
'Year': release_year},
ignore_index = True)
return years
# ninties = ninties.merge(fetch_years(), on = ['Artist', 'Title', 'Album'], how='left')
# ninties['Year'] = ninties['Year'].fillna(value=0).astype(int)
# print "Of %d 90s tracks, %d had valid dates and %d did not" % \
# (len(ninties), len(ninties[ninties['Year'] > 0]), len(ninties[ninties['Year'] == 0]))
# ninties_extras = ninties_extras.merge(fetch_years(), on = ['Artist', 'Title', 'Album'], how='left')
# ninties_extras['Year'] = ninties_extras['Year'].fillna(value=0).astype(int)
# print "Of %d 90s extra tracks, %d had valid dates and %d did not" % \
# (len(ninties_extras), len(ninties_extras[ninties['Year'] > 0]), len(ninties_extras[ninties_extras['Year'] == 0]))
MusicBrainz is an free online music database, with an external XML Web-service that is supported in Python via the musicbrainzngs library. I'd originally used it to get publication year for the 2016 countdown, but abandoned it in 2017 since the 2017 playlist page had lists by year. Since there's no year data in 2023's 885 Greatest Songs by Women I'm bringing it back.
There are a couple of potential issues with querying MusicBrainz
And when that all fails, I've been known to resort to just searching various music sites and manually updating the data.
One consequence is that we'll always lag on publication year data during the running of the playlists.
def add_musicbrainz_data(playlist, min_year = 1900, cache_file = None):
"""
Add data from the musicbrainz database. Currently just first year of publication.
The input data frame should contain at least Title and Artist fields
and the resulting dataframe will have a new Year field.
The cache file if used, should have been generated by a previous run of
this function.
Using a cache is strongly encouraged,
as the MusicBrainz search interface is rate limited to one search per second
so this can be very slow for large playlists.
Args:
playlist (Dataframe) : playlist to update
min_year (int) : miminum year to consider
cache_file (string) : path to cache file
Returns:
Dataframe containing the augmented playlist
"""
import musicbrainzngs as mb
mb.set_useragent('xpn-a2z', '0.1','https://github.com/asudell/a2z')
# keep a list of artists named differently
# at MusicBrainz than XPN, so we can 'fix' them
artist_names = {
"R. E. M.": "REM",
"Run-DMC": "Run-D.M.C.",
"The Ramones": "Ramones"
}
# load the cache if we have one
if cache_file is not None and path.exists(cache_file):
years = pd.read_csv(cache_file, encoding='utf-8')
years = years.drop_duplicates()
else:
years = pd.DataFrame(None, columns=('Title','Artist', 'Year'))
augmented = playlist.merge(years, how = 'left')
# Lookup any unaugmented rows
new_mb_rows = []
for index, row in augmented[augmented['Year'].isnull()].iterrows():
if row['Artist'] in artist_names:
artist = artist_names[row['Artist']]
else:
artist = row['Artist']
result = mb.search_recordings(row['Title'],
artist = artist,
status = 'official',
strict = True,
limit = 25)
rel_year = None
for recording in result['recording-list']:
if recording['release-list']:
for release in recording['release-list']:
if 'date' in release and len(release['date']) > 0:
y = int(release['date'].split('-')[0])
if rel_year is None or rel_year > y:
if y >= min_year:
# assume years before 1900 are typos
rel_year = y
if rel_year is not None:
new_mb_rows.append([row['Title'], row['Artist'], rel_year])
new_years = pd.DataFrame(new_mb_rows, columns=('Title','Artist', 'Year'))
# if we found new data, resave the cache and rebuild the augmented data
if len(new_years) > 0:
years = years.append(new_years, ignore_index=True)
years = years.drop_duplicates()
if cache_file is not None:
years.to_csv(cache_file, index=False, encoding='utf-8')
augmented = playlist.merge(years, how = 'left')
return augmented
women = add_musicbrainz_data(women, 1900, path.join(musicbrainz_cache_dir, 'women_years.csv'))
# pandas dosen't support NAs in int data, so set to 0
women['Year'] = women['Year'].fillna(0).astype(int)
# save a copy of anything without a year for manual review
women_missing = women[women['Year'] == 0][['Title', 'Artist']]
women_missing.to_csv(path.join(musicbrainz_cache_dir, 'women_need_years.csv'),
index=False, encoding='utf-8')
HTML(women.head(5).to_html())
HTML(women.tail(5).to_html())
# ninties_data_file = path.join(data_dir, '90sA2Z.csv')
# ninties.to_csv(ninties_data_file, index=False, encoding='utf8')
# ninties_extras_data_file = path.join(data_dir, '90sextras.csv')
# ninties_extras.to_csv(ninties_extras_data_file, index=False, encoding='utf8')
women_data_file = path.join(data_dir, 'women.csv')
women.to_csv(women_data_file, index=False, encoding='utf8')
The code for this project is in my github repo and this file is specifically Dataloading2.
This project is licensed under a Creative Commons Attribution-ShareAlike 4.0 International License. You are free to use for commercial or non-commercial purposes, so long as you attribute the source and also allow sharing.