Commit 77a7e6a1 authored by Gerion Entrup's avatar Gerion Entrup
Browse files

fetcher: make it a module and use locking

fetcher will be used by two threads, making it a module
prevents two caches, locking prevents simultanious execution
parent 28c7c630
......@@ -11,10 +11,10 @@ from sqlalchemy.orm import Session
from musicbrainzngs import WebServiceError
from collections import namedtuple
import fetcher
from model import Recording
from mbdata.models import ArtistCredit, ArtistCreditName, Artist, Release, ReleaseGroup, Medium, Track, MediumFormat
from utils import pairwise
from fetcher import Fetcher
Paths = queue.Queue(maxsize=10)
NEW = 0; DEFECT = 1; UPDATE = 2
......@@ -29,7 +29,6 @@ class Collector(threading.Thread):
def __init__(self, session_fac):
super().__init__(target=self)
self._session = session_fac()
self.fetcher = Fetcher()
self.log = logging.getLogger('collector')
def run(self):
......@@ -123,7 +122,7 @@ class Collector(threading.Thread):
recording = self._session.query(Recording).filter_by(gid=mbid).first()
if recording is None:
try:
result = self.fetcher.get_table_by_id(mbid, 'recording')
result = fetcher.get_table_by_id(mbid, 'recording')
#minimal mapping
recording = Recording()
......@@ -143,7 +142,7 @@ class Collector(threading.Thread):
# this is clearly a workaround and only works efficient because of caching.
# correct way would be to fetch all tracks directly, but the musicbrainz api
# offers no way to do this.
mediumlist = self.fetcher.get_table_by_id(releasedata['id'], 'release')['medium-list']
mediumlist = fetcher.get_table_by_id(releasedata['id'], 'release')['medium-list']
for medium in mediumlist:
for track in medium['track-list']:
if track['recording']['id'] == mbid:
......@@ -159,7 +158,7 @@ class Collector(threading.Thread):
def fetch_release_group(self, mbid):
rg = self._session.query(ReleaseGroup).filter_by(gid=mbid).first()
if rg is None:
result = self.fetcher.get_table_by_id(mbid, 'release-group')
result = fetcher.get_table_by_id(mbid, 'release-group')
rg = ReleaseGroup()
......@@ -174,7 +173,7 @@ class Collector(threading.Thread):
def fetch_artist(self, mbid):
artist = self._session.query(Artist).filter_by(gid=mbid).first()
if artist is None:
result = self.fetcher.get_table_by_id(mbid, 'artist')
result = fetcher.get_table_by_id(mbid, 'artist')
artist = Artist()
artist.gid = result['id']
......@@ -188,7 +187,7 @@ class Collector(threading.Thread):
def fetch_release(self, mbid):
release = self._session.query(Release).filter_by(gid=mbid).first()
if release is None:
result = self.fetcher.get_table_by_id(mbid, 'release')
result = fetcher.get_table_by_id(mbid, 'release')
#minimal mapping
release = Release()
......
import collections
import logging
import musicbrainzngs
import time
import settings
class Fetcher:
"""
Fetches the musicdata and caches them.
"""
def __init__(self):
self.logger = logging.getLogger('collector.fetcher')
musicbrainzngs.set_useragent("brainzfs", "0.1-alpha",
"https://git.finf.uni-hannover.de/Chrysops/brainzfs")
self._cache = {'recording': {},
'release-group': {},
'artist': {},
'release': {}}
self._cachedates = collections.deque()
self._time = 0
# for debugging
# from offline import CACHE
# self._cache = CACHE
def _time_ms(self):
return int(time.time() * 1000)
def _delay(self):
diff = self._time_ms() - self._time
if (diff < 1000):
time.sleep((1000 - diff) / 1000)
self._time = self._time_ms()
def _cache_append(self, tablename, mbid, result):
self._cache[tablename][mbid] = result
self._cachedates.append((self._time_ms, tablename, mbid))
if len(self._cachedates) > settings.fetcher_cache_length:
d_time, tablename, mbid = self._cachedates.popleft()
del(self._cache[tablename][mbid])
def clean_cache(self):
threshold = self._time_ms - settings.fetcher_cache_age * 1000
d_time, tablename, mbid = self._cachedates[0]
while d_time < threshold:
self._cachedates.popleft()
del(self._cache[tablename][mbid])
d_time, tablename, mbid = self._cachedates[0]
def get_table_by_id(self, mbid, tablename):
"""fetch the data from musicbrainz. Asked the cache in transparent
way, if data already available and handles the musicbrainz timing
rules. Throws an musicbrainzngs.WebServiceError if anything goes
wrong.
"""
methods = {
'recording': lambda mbid: musicbrainzngs.get_recording_by_id(mbid, includes=['releases', 'artists'])['recording'],
'release-group': lambda mbid: musicbrainzngs.get_release_group_by_id(mbid, includes=['artist-credits'])['release-group'],
'artist': lambda mbid: musicbrainzngs.get_artist_by_id(mbid)['artist'],
'release': lambda mbid: musicbrainzngs.get_release_by_id(mbid, includes=['artists',
'media',
'recordings',
'release-groups'])['release']
}
if mbid not in self._cache[tablename]:
# delay to follow musicbrainz rules
self._delay()
# request actual data
result = methods[tablename](mbid)
self._cache_append(tablename, mbid, result)
self.logger.debug("asked web for {}".format(tablename))
else:
result = self._cache[tablename][mbid]
self.logger.debug("asked fetcher for {}".format(tablename))
return result
import collections
import logging
import musicbrainzngs
import time
import threading
import settings
"""
Fetches the musicdata and caches them.
"""
_logger = logging.getLogger('collector.fetcher')
musicbrainzngs.set_useragent("brainzfs", "0.1-alpha",
"https://git.finf.uni-hannover.de/Chrysops/brainzfs")
_lock = threading.Lock()
_cache = {'recording': {},
'release-group': {},
'artist': {},
'release': {}}
_cachedates = collections.deque()
_time = 0
_methods = {
'recording': lambda mbid: musicbrainzngs.get_recording_by_id(mbid, includes=['releases', 'artists'])['recording'],
'release-group': lambda mbid: musicbrainzngs.get_release_group_by_id(mbid, includes=['artist-credits'])['release-group'],
'artist': lambda mbid: musicbrainzngs.get_artist_by_id(mbid)['artist'],
'release': lambda mbid: musicbrainzngs.get_release_by_id(mbid, includes=['artists',
'media',
'recordings',
'release-groups'])['release']
}
# for debugging
# from offline import CACHE
# _cache = CACHE
def _time_ms():
return int(time.time() * 1000)
def _delay():
global _time
diff = _time_ms() - _time
if (diff < 1000):
time.sleep((1000 - diff) / 1000)
_time = _time_ms()
def _cache_append(tablename, mbid, result):
_cache[tablename][mbid] = result
_cachedates.append((_time_ms, tablename, mbid))
if len(_cachedates) > settings.fetcher_cache_length:
d_time, tablename, mbid = _cachedates.popleft()
del(_cache[tablename][mbid])
def clean_cache():
with _lock:
threshold = _time_ms - settings.fetcher_cache_age * 1000
d_time, tablename, mbid = _cachedates[0]
while d_time < threshold:
_cachedates.popleft()
del(_cache[tablename][mbid])
d_time, tablename, mbid = _cachedates[0]
def get_table_by_id(mbid, tablename):
"""fetch the data from musicbrainz. Asked the cache in transparent
way, if data already available and handles the musicbrainz timing
rules. Throws an musicbrainzngs.WebServiceError if anything goes
wrong.
"""
with _lock:
if mbid not in _cache[tablename]:
# delay to follow musicbrainz rules
_delay()
# request actual data
result = _methods[tablename](mbid)
_cache_append(tablename, mbid, result)
_logger.debug("asked web for {}".format(tablename))
else:
result = _cache[tablename][mbid]
_logger.debug("asked fetcher for {}".format(tablename))
return result
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment