Commit 0e1f2c86 authored by Gerion Entrup's avatar Gerion Entrup
Browse files

collector: refactoring

- lay out retrieval logic and database access to retrieval module
- add MediumFormat again (gid is faked though)
parent 1eb7cd4f
......@@ -2,13 +2,13 @@ import threading
import queue
import logging
import os.path
import retrieval
import musicbrainzngs
from sqlalchemy import and_
from mbdata.models import ArtistCredit, ArtistCreditName, Artist, Release, ReleaseGroup, Medium, Track, MediumFormat
import fetcher
from fetcher import Table
from retrieval import Entity, Filter, Table
from model import Recording
from utils import pairwise
......@@ -22,9 +22,9 @@ class Collector(threading.Thread):
"""
def __init__(self, session_fac, threaded=True):
retrieval.init(Collector._structure, session_fac)
if threaded:
super().__init__(target=self)
self._session = session_fac()
self._logger = logging.getLogger('collector')
@classmethod
......@@ -36,192 +36,183 @@ class Collector(threading.Thread):
def run(self):
while True:
mbid, path = Paths.get()
self._logger.info("Adding file " +
"{} to the database.".format(path))
try:
self._logger.info("Adding file " +
"{} to the database.".format(path))
self.fetch_recording(mbid, path)
retrieval.create(Table.recording, mbid, path)
retrieval.commit()
except musicbrainzngs.WebServiceError as exc:
self._session.rollback()
self._logger.error("Could not connect to Musicbrainz. " +
"Path: {} Request: {}".format(path, exc))
self._session.commit()
self._logger.error("Could not connect to Musicbrainz. "
"Path: {}, Request: {}".format(path, exc))
Paths.task_done()
self._session.close()
def create_artist_credit(self, acresult, acphrase):
ac = self._session.query(ArtistCredit).filter_by(name=acphrase).first()
if ac is None:
# non nullable attributes
ac = ArtistCredit()
ac.name = acphrase
acns = self.create_artist_credit_name(acresult, ac)
ac.artist_count = len(acns)
# nullable attributes
ac.ref_count = 1
self._session.add(ac)
for acn in acns:
self._session.add(acn)
else:
# nullable attributes
ac.ref_count += 1
return ac
def create_artist_credit_name(self, acresult, artistcredit):
acns = []
for artist, joinphrase in pairwise(acresult + ['']):
acn = ArtistCreditName()
# non nullable attributes
acn.artist_credit = artistcredit
acn.position = len(acns) + 1
acn.artist = self.fetch_artist(artist['artist']['id'])
if 'name' in artist:
# the recording uses not the standard artist name
acn.name = artist['name']
else:
acn.name = artist['artist']['name']
acn.join_phrase = joinphrase
acns.append(acn)
return acns
def fetch_artist(self, mbid):
artist = self._session.query(Artist).filter_by(gid=mbid).first()
if artist is None:
result = fetcher.get_table_by_id(mbid, Table.artist)
# non nullable attributes
artist = Artist()
artist.gid = result['id']
artist.name = result['name']
artist.sort_name = result['sort-name']
# add to db
self._session.add(artist)
return artist
def fetch_recording(self, mbid, path):
recording = self._session.query(Recording).filter_by(gid=mbid).first()
if recording is None:
result = fetcher.get_table_by_id(mbid, Table.recording)
# non nullable attributes
recording = Recording()
recording.gid = result['id']
recording.name = result['title']
recording.fgid = mbid
recording.path = path
recording.ftype = os.path.splitext(path)[1][1:]
recording.artist_credit = self.create_artist_credit(
result['artist-credit'], result['artist-credit-phrase'])
# nullable attributes
if 'length' in result:
recording.length = result['length']
print(recording)
self._session.add(recording)
# extended mapping
for releasedata in result['release-list']:
release = self.fetch_release(releasedata['id'])
# Find track in release.
# This is clearly a workaround and only works efficient
# because of caching. Correct way would be to fetch all tracks
# directly, but the musicbrainz api offers no way to do this.
mediumlist = fetcher.get_table_by_id(
releasedata['id'], Table.release)['medium-list']
for medium in mediumlist:
for track in medium['track-list']:
if track['recording']['id'] == result['id']:
self.create_track(track,
release,
recording,
medium)
return recording
def fetch_release(self, mbid):
release = self._session.query(Release).filter_by(gid=mbid).first()
if release is None:
result = fetcher.get_table_by_id(mbid, Table.release)
# non nullable attributes
release = Release()
release.gid = result['id']
release.name = result['title']
release.artist_credit = self.create_artist_credit(
result['artist-credit'], result['artist-credit-phrase'])
release.release_group = self.fetch_release_group(
result['release-group']['id'])
# nullable attributes
release.quality = self.create_quality(result['quality'])
self._session.add(release)
# extended mapping
self.create_medium(result['medium-list'], release)
return release
def create_quality(self, quality):
qualities = {'low': 1,
'normal': 2,
'high': 3}
return qualities[quality]
def create_medium(self, mediumdata, release):
mediums = []
for med in mediumdata:
medium = Medium()
medium.release = release
medium.position = med['position']
medium.track_count = len(med['track-list'])
# TODO report in bug, this requires an id
# if 'format' in med:
# medium.format = self.create_medium_format(med['format'])
mediums.append(medium)
self._session.add(medium)
return mediums
def create_medium_format(self, name):
mediumformat = self._session.query(MediumFormat).filter_by(name=name).first()
if mediumformat is None:
mediumformat = MediumFormat()
mediumformat.name = name
self._session.add(mediumformat)
return mediumformat
def create_track(self, trackdata, release, recording, mediumdata):
track = self._session.query(Track).filter_by(gid=trackdata['id']).first()
if track is None:
track = Track()
track.gid = trackdata['id']
track.position = trackdata['position']
track.number = trackdata['number']
print(recording)
track.recording = recording
track.name = recording.name
track.medium = self._session.query(Medium).filter(
and_(Medium.position == mediumdata['position'],
Medium.release == release)).one()
track.artist_credit = recording.artist_credit
track.artist_credit.ref_count += 1
if 'length' in trackdata:
track.length = trackdata['length']
self._session.add(track)
return track
def fetch_release_group(self, mbid):
rg = self._session.query(ReleaseGroup).filter_by(gid=mbid).first()
if rg is None:
result = fetcher.get_table_by_id(mbid, Table.release_group)
rg = ReleaseGroup()
rg.gid = result['id']
rg.name = result['title']
rg.artist_credit = self.create_artist_credit(
result['artist-credit'], result['artist-credit-phrase'])
self._session.add(rg)
return rg
def _release_mapping(release, result, mbid):
# non nullable attributes
release.gid = result['id']
release.name = result['title']
release.artist_credit = retrieval.create(Table.artist_credit,
result['artist-credit'],
result['artist-credit-phrase'])
release.release_group = retrieval.create(Table.release_group,
result['release-group']['id'])
# nullable attributes
if 'quality' in result:
release.quality = retrieval.map_quality(result['quality'])
# extended mapping
for medium in result['medium-list']:
retrieval.create(Table.medium, release, medium['position'], medium)
def _recording_mapping(recording, result, mbid, path):
# non nullable attributes
recording.gid = result['id']
recording.name = result['title']
recording.fgid = mbid
recording.path = path
recording.ftype = os.path.splitext(path)[1][1:]
recording.artist_credit = retrieval.create(Table.artist_credit,
result['artist-credit'],
result['artist-credit-phrase'])
# nullable attributes
if 'length' in result:
recording.length = result['length']
# extended mapping
for releasedata in result['release-list']:
release = retrieval.create(Table.release, releasedata['id'])
# Find track in release.
# This is clearly a workaround and only works efficient
# because of caching. Correct way would be to fetch all tracks
# directly, but the musicbrainz api offers no way to do this.
mediumlist = retrieval.get_table_by_id(
releasedata['id'], Table.release)['medium-list']
for medium in mediumlist:
for track in medium['track-list']:
if track['recording']['id'] == result['id']:
retrieval.create(Table.track, track['id'], release,
recording, medium, track)
def _artist_mapping(artist, result, mbid):
# non nullable attributes
artist.gid = result['id']
artist.name = result['name']
artist.sort_name = result['sort-name']
def _release_group_mapping(release_group, result, mbid):
release_group.gid = result['id']
release_group.name = result['title']
release_group.artist_credit = retrieval.create(Table.artist_credit,
result['artist-credit'],
result['artist-credit-phrase'])
def _artist_credit_mapping(artist_credit, acresult, acphrase):
# non nullable attributes
artist_credit.name = acphrase
# acresult is of type ["artist1", "feat.", "artist2"]
artist_credit.artist_count = (len(acresult) + 1) // 2
# nullable attributes
artist_credit.ref_count = 1
# extended mapping
for idx, (data, joinphrase) in enumerate(pairwise(acresult + [''])):
data['joinphrase'] = joinphrase
retrieval.create(Table.artist_credit_name,
artist_credit,
idx + 1, # SQL IDs begin with 1
data)
def _artist_credit_r_mapping(artist_credit, acphrase, acresult):
artist_credit.ref_count += 1
def _artist_credit_name_mapping(acn, artist_credit, position, data):
# non nullable attributes
acn.artist_credit = artist_credit
acn.position = position
acn.artist = retrieval.create(Table.artist, data['artist']['id'])
acn.name = data['name'] if 'name' in data else data['artist']['name']
acn.join_phrase = data['joinphrase']
def _medium_mapping(medium, release, position, medium_data):
medium.release = release
medium.position = medium_data['position']
medium.track_count = medium_data['track-count']
if 'format' in medium_data:
medium.format = retrieval.create(Table.medium_format,
medium_data['format'])
def _medium_format_mapping(medium_format, name):
medium_format.name = name
# This is clearly a workaround because the musicbrainz-API does not
# allow to retrieve the gid of the medium
medium_format.gid = retrieval.fake_id('mediumformat', name)
def _track_mapping(track, mbid, release, recording, medium_data, data):
track.gid = data['id']
track.position = data['position']
track.number = data['number']
track.recording = recording
track.name = recording.name
track.medium = retrieval.create(Table.medium, release,
medium_data['position'], medium_data)
track.artist_credit = recording.artist_credit
track.artist_credit.ref_count += 1
if 'length' in data:
track.length = data['length']
_structure = {
Table.release: Entity(parameter=('mbid',),
mb_class=Release,
query_filter=Filter(gid='mbid'),
web='mbid',
mapping=_release_mapping),
Table.recording: Entity(parameter=('mbid', 'path'),
mb_class=Recording,
query_filter=Filter(gid='mbid'),
web='mbid',
mapping=_recording_mapping),
Table.artist: Entity(parameter=('mbid',),
mb_class=Artist,
query_filter=Filter(gid='mbid'),
web='mbid',
mapping=_artist_mapping),
Table.release_group: Entity(parameter=('mbid',),
mb_class=ReleaseGroup,
query_filter=Filter(gid='mbid'),
web='mbid',
mapping=_release_group_mapping),
Table.artist_credit: Entity(parameter=('acresult', 'acphrase'),
mb_class=ArtistCredit,
query_filter=Filter(name='acphrase'),
web=None,
mapping=_artist_credit_mapping,
reverse_mapping=_artist_credit_r_mapping),
Table.artist_credit_name: Entity(parameter=('artist_credit',
'position',
'data'),
mb_class=ArtistCreditName,
query_filter=Filter(artist_credit='artist_credit',
position='position'),
web=None,
mapping=_artist_credit_name_mapping),
Table.medium: Entity(parameter=('release', 'position', 'medium_data'),
mb_class=Medium,
query_filter=Filter(release='release',
position='position'),
web=None,
mapping=_medium_mapping),
Table.medium_format: Entity(parameter=('name',),
mb_class=MediumFormat,
query_filter=Filter(name='name'),
web=None,
mapping=_medium_format_mapping),
Table.track: Entity(parameter=('mbid', 'release', 'recording',
'medium_data', 'data'),
mb_class=Track,
query_filter=Filter(gid='mbid'),
web=None,
mapping=_track_mapping)}
from retrieval.fetcher import get_table_by_id
from retrieval.entity import Entity, Filter, Table
from retrieval.helper import map_quality, fake_id
from retrieval.retrieval import Retrieval
__all__ = ['get_table_by_id', 'Table', 'Entity', 'Filter',
'create', 'init', 'commit',
'map_quality', 'fake_id']
_retrieval = False
def init(structure, session_fac):
global _retrieval
if not _retrieval:
_retrieval = Retrieval(structure, session_fac)
def create(table, *args):
global _retrieval
if not _retrieval:
raise("You have to call init before using create.")
return _retrieval.create(table, *args)
def commit():
global _retrieval
if not _retrieval:
raise("You have to call init before using create.")
return _retrieval.commit()
import enum
from collections import namedtuple
def dummy(*args, **kwargs):
pass
Entity = namedtuple('Entity', ['parameter', 'mb_class', 'query_filter',
'web', 'mapping', 'reverse_mapping'])
# set reverse_mapping default to dummy and make it optional
Entity.__new__.__defaults__ = (dummy, )
class Filter():
def __init__(self, **kwargs):
self._kwargs = kwargs
def get_kwargs(self):
return self._kwargs.copy()
class Table(enum.Enum):
artist = 1
artist_credit = 2
artist_credit_name = 3
medium = 4
medium_format = 5
recording = 6
release = 7
release_group = 8
track = 9
......@@ -3,20 +3,17 @@ import logging
import musicbrainzngs
import time
import threading
import enum
import settings
from retrieval.entity import Table
"""
Fetches the musicdata and caches them.
"""
class Table(enum.Enum):
recording = 0
release_group = 1
artist = 2
release = 3
web = [Table.recording, Table.release_group, Table.artist, Table.release]
musicbrainzngs.set_useragent("brainzfs", "0.1-alpha",
"https://git.finf.uni-hannover.de/Chrysops/brainzfs")
......@@ -25,7 +22,7 @@ _logger = logging.getLogger('collector.fetcher')
_lock = threading.Lock()
_cache = {}
for table in Table:
for table in web:
_cache[table] = {}
_cachedates = collections.deque()
......@@ -99,6 +96,8 @@ def get_table_by_id(mbid, tablename):
rules. Throws an musicbrainzngs.WebServiceError if anything goes
wrong.
"""
if tablename not in web:
raise("Error: no Web lookup possible for " + str(tablename))
with _lock:
if mbid not in _cache[tablename]:
# delay to follow musicbrainz rules
......
import hashlib
def map_quality(value):
qualities = {'low': 1,
'normal': 2,
'high': 3}
return qualities[value]
def fake_id(name, value):
h = hashlib.sha1((name + value).encode('utf-8')).hexdigest()
return '-'.join([h[0:8], h[8:12], h[12:16], h[16:20], h[20:32]])
import logging
import sqlalchemy
from retrieval import fetcher
from contextlib import contextmanager
@contextmanager
def session_scope(Session):
"""Provide a transactional scope around a series of operations."""
session = Session()
try:
yield session
session.commit()
except:
session.rollback()
raise
finally:
session.close()
class Retrieval():
def __init__(self, structure, Session):
self._logger = logging.getLogger('retrieval')
self._Session = Session
self._structure = structure
self._cache = {}
for key in structure.keys():
self._cache[key] = {}
def _check_state(self, objects):
state_ok = False
for obj in objects:
try:
i = sqlalchemy.inspect(obj)
state_ok |= i.persistent
except sqlalchemy.exc.NoInspectionAvailable:
# ignore non database objects
pass
return state_ok
def create(self, table, *args):
entity = self._structure[table]
assert(len(args) == len(entity.parameter))
# konstruct keys
kwargs = entity.query_filter.get_kwargs()
for key in kwargs:
kwargs[key] = args[entity.parameter.index(kwargs[key])]
cache_key = tuple(kwargs.values())
# search the cache if object exists
if cache_key in self._cache[table]:
rs = self._cache[table][cache_key]
entity.reverse_mapping(rs, *args)
return rs
# search the database if object exists
elif self._check_state(kwargs.values()):
with session_scope(self._Session) as session:
rs = session.query(entity.mb_class).filter_by(**kwargs).first()
if rs is not None:
entity.reverse_mapping(rs, *args)
return rs
obj = entity.mb_class()
# fetch the data from web
if entity.web is not None:
value = args[entity.parameter.index(entity.web)]
result = fetcher.get_table_by_id(value, table)
entity.mapping(obj, result, *args)
else:
entity.mapping(obj, *args)
self._cache[table][cache_key] = obj
return obj
def commit(self):
with session_scope(self._Session) as session:
for table in self._cache.values():
for entity in table.values():
session.add(entity)
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment