Commit 1eb7cd4f authored by Gerion Entrup's avatar Gerion Entrup
Browse files

working on collector and plumber

not ready yet
parent 25715fb8
......@@ -13,6 +13,7 @@ from model import Recording
from utils import pairwise
Paths = queue.Queue(maxsize=10)
lock = threading.RLock()
class Collector(threading.Thread):
......@@ -119,6 +120,7 @@ class Collector(threading.Thread):
# nullable attributes
if 'length' in result:
recording.length = result['length']
print(recording)
self._session.add(recording)
# extended mapping
......@@ -198,6 +200,7 @@ class Collector(threading.Thread):
track.gid = trackdata['id']
track.position = trackdata['position']
track.number = trackdata['number']
print(recording)
track.recording = recording
track.name = recording.name
track.medium = self._session.query(Medium).filter(
......
......@@ -11,6 +11,7 @@ class BrainzFS(Base):
def patch_recording():
# http://docs.sqlalchemy.org/en/latest/core/metadata.html#sqlalchemy.schema.Table.append_column
# store the path of the file
setattr(Recording, 'path', Column(String, nullable=False))
# store the ending of the file (ogg,mp3,...)
......
......@@ -5,6 +5,7 @@ import os.path
import queue
import settings
import threading
import datetime
from collector import Collector
from model import Recording
......@@ -30,6 +31,10 @@ class Plumber(threading.Thread):
self._logger = logging.getLogger('plumber')
self._collector = Collector.get_unthreaded_instance(
self._session, logging.getLogger('plumber.collector'))
if settings.minimum_cleaning_age < 300:
self._logger.warn("minimum_cleaning_age is below 300, this could" +
" lead to several SQL error and is considered" +
" unsafe.")
def run(self):
while True:
......@@ -44,6 +49,11 @@ class Plumber(threading.Thread):
self.mark_outdated()
self._session.close()
def old_enough(self, entity):
delta = datetime.datetime.now() - \
datetime.timedelta(seconds=settings.minimum_cleaning_age)
return entity.last_updated < delta
def check_recordings(self):
recordings = self._session.query(Recording).all()
missing = []
......@@ -59,15 +69,16 @@ class Plumber(threading.Thread):
self.clean_recording(recording)
def clean_recording(self, recording):
self._logger.info("Delete recording " + recording.path)
self.clean_artist_credit(recording.artist_credit, max_references=1)
# clean tracks
tracks = self._session.query(Track).filter_by(recording=recording)
for track in tracks:
self._session.delete(track)
# clean recording
self._session.delete(recording)
# check rest of the database for consistency
if self.old_enough(recording):
self._logger.info("Delete recording " + recording.path)
self.clean_artist_credit(recording.artist_credit, max_references=1)
# clean tracks
tracks = self._session.query(Track).filter_by(recording=recording)
for track in tracks:
self._session.delete(track)
# clean recording
self._session.delete(recording)
# check rest of the database for consistency
self.clean_database()
def clean_artist_credit(self, artist_credit, max_references=0):
......@@ -78,7 +89,7 @@ class Plumber(threading.Thread):
count += self._session.query(ReleaseGroup).filter_by(
artist_credit=artist_credit).count()
if count <= max_references:
if count <= max_references and self.old_enough(artist_credit):
# delete dependent artist_credit_names
acns = self.session.query(ArtistCreditName).filter_by(
artist_credit=artist_credit)
......@@ -90,25 +101,25 @@ class Plumber(threading.Thread):
def clean_medium(self, medium, max_references=0):
count = self._session.query(Track).filter_by(
medium=medium).count()
if count <= max_references:
if count <= max_references and self.old_enough(medium):
self._session.delete(medium)
def clean_release(self, release, max_references=0):
count = self._session.query(Medium).filter_by(
release=release).count()
if count <= max_references:
if count <= max_references and self.old_enough(release):
self._session.delete(release)
def clean_release_group(self, release_group, max_references=0):
count = self._session.query(Release).filter_by(
release_group=release_group).count()
if count <= max_references:
if count <= max_references and self.old_enough(release_group):
self._session.delete(release_group)
def clean_artist(self, artist, max_references=0):
count = self._session.query(ArtistCreditName).filter_by(
artist=artist).count()
if count <= max_references:
if count <= max_references and self.old_enough(artist):
self._session.delete(artist)
def clean_database(self):
......@@ -122,10 +133,10 @@ class Plumber(threading.Thread):
self.clean_artist(artist)
def mark_outdated(self):
for recording in self._session.query(Recording):
for recording in self._session.query(Recording).all():
self.check_update_time(recording)
def check_update_time(self, entity):
print("last_updated:")
print("BRABRABRA: last_updated:")
print(entity.last_updated)
print(type(entity.last_updated))
print(type(entity.last_updated)) # datetime.datetime
......@@ -25,28 +25,33 @@ class Schedu(threading.Thread):
walker.start()
walker.join()
def _clean():
Actions.put(Action.check_recordings)
def _repeat_every(self, delay, prio, action,
args=(), kwargs={}, blocking=False):
def wrapped_action(schd, delay, prio, action, args, kwargs, blocking):
logging.getLogger('schedu').info("execute: " + action.__name__)
args=(), kwargs={}, blocking=False, name=None):
def wrapped_action(schd, delay, prio, action, args, kwargs,
blocking, name):
logging.getLogger('schedu').info("execute: " + name)
t = time.time()
action(*args, **kwargs)
t = time.time() - t if blocking else 0
schd.enter(max(0, delay-t), prio, wrapped_action,
argument=(schd, delay, prio, action,
args, kwargs, blocking))
args, kwargs, blocking, name))
if name is None:
name = action.__name__
self._sched.enter(0, prio, wrapped_action,
argument=(self._sched, delay, prio, action,
args, kwargs, blocking))
args, kwargs, blocking, name))
def _init_schedule(self):
self._repeat_every(settings.scan_interval, 2, Schedu._scan,
args=(self._session, self._sourcedir),
blocking=True)
self._repeat_every(settings.clean_interval, 1, Schedu._clean)
self._repeat_every(settings.clean_interval, 1, Actions.put,
args=(Action.check_recordings,),
name="clean recordings")
self._repeat_every(5, 1, Actions.put,
args=(Action.mark_outdated,),
name="check outdated entries")
def run(self):
self._sched.run()
......@@ -6,3 +6,10 @@ fetcher_cache_age = 172800 # 48 hours
directory_prefix = '/'
force_clean = False
scan_interval = 20 # 86400 # 24 hours
clean_interval = 20 # 86400 # 24 hours
update_interval = 2592000 # 1 month
check_update_interval = 3600 # 1 hour
minimum_cleaning_age = 300 # 5 minutes
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment