###################################################################### # Copyright (c) 2007, Petteri Aimonen # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions are met: # * Redistributions of source code must retain the above copyright # notice and this list of conditions. # * Redistributions in binary form must reproduce the above copyright # notice and this list of conditions in the documentation and/or other # materials provided with the distribution. '''Data import Handles two types of files: 1) Per-animal data in daily files of daily feed amounts - Used to update animal database 2) Per-valve data in single file of daily amounts ''' import re import settings import os.path import glob from pymisc import _, unicode_traceback, recode from database import db_conn from dataaccess import getgroup, changegroup, filterfiles, addfiles from dataaccess import date_fromdb, date_todb, getgroupspervalve class AnimalFeedImporter: '''Import per-animal feed data, updating feeds and animals.''' def __init__(self, logger): self.cursor = db_conn.cursor() self.logger = logger self.skiplines = [ re.compile("\s*"), re.compile("\* SANOMA ALKAA.*"), re.compile("\* SANOMA P.*") ] def shouldskip(self, line): '''Return True if line matches pattern in self.skiplines. Error about not parsing line will not be logged then.''' for pattern in self.skiplines: if pattern.match(line): return True return False def filelist(self): '''Get list of files to read''' datapath = settings.parser.get("DEFAULT", "animalfeed_datapath") files = [] if os.path.isdir(datapath): datapath = os.path.join(datapath, "*.dat") files = glob.glob(datapath) files.sort() # Date in filename count = settings.parser.getint("DEFAULT", "animalfeed_filecount") files = files[-count:] return filterfiles(files) def importfile(self, filename): '''Import file, commit database, log error if fails''' try: self._importfile(filename) addfiles([filename]) # Add to list of imported files db_conn.commit() except: self.logger.log_readfail(filename, unicode_traceback()) def _importfile(self, filename): '''Import a CSV file in format origid;animalid;date;group;type;grams. date is YYYYMMDD, others are plain integers. Insert data to database, update animals ''' format = re.compile("\d+;(\d+);(\d{8});(\d+);\d+;(\d+)\s*") file = open(filename, 'rU') data = [] for line in file: # Parse line m = re.match(format, line) if not m: if not self.shouldskip(line): self.logger.log_skipped_afeed(filename, recode(line)) continue animalid, date, group, grams = m.groups() animalid, group, grams = map(int, (animalid, group, grams)) # Check for moves and add animal if necessary if getgroup(animalid) != group: self.animalnotfound(animalid, group) data.append((date, animalid, grams)) # Save feeds self.cursor.executemany("REPLACE INTO feeds VALUES (?,?,?)", data) def animalnotfound(self, animalid, group): '''Called if animal, group combination is not found in the database. Adds animal to database or moves it to new group.''' # Check for moved animals oldgroup = getgroup(animalid) if oldgroup is not None: changegroup(animalid, group) self.logger.log_moved_animal(animalid, oldgroup, group) return # Save animal self.cursor.execute("INSERT INTO animals VALUES (?, ?, 1, '')", (animalid, group)) self.logger.log_new_animal(animalid) from dataaccess import groupfeed_getlast class GroupFeedImporter: def __init__(self, logger): self.cursor = db_conn.cursor() self.logger = logger # Format is date;valve;animalcount;kgs;water1;water2 self.format = re.compile("(\d+);(\d+);(\d+);([0-9,]+);\d+;\d+\s*") self.skiplines = [ re.compile("\s*"), re.compile("Date;Valve;.*") # Column names ] def shouldskip(self, line): '''Return True if line should be skipped. Error about not parsing will not be logged if line matches a pattern in self.skiplines''' for pattern in self.skiplines: if pattern.match(line): return True return False def parseline(self, line): '''Parse single line. Return (date, valve, animalcount, grams). Date is datetime.date, others are plain integers. If line is not valid, return None.''' m = self.format.match(line) if not m: if not self.shouldskip(line): self.logger.log_skipped_gfeed(recode(line)) return None else: grams = int(float(m.group(4).replace(',', '.')) * 1000.) return (date_fromdb(m.group(1)), int(m.group(2)), int(m.group(3)), grams) def importfile(self, filename): '''Import file, commit database, log failures''' try: self._importfile(filename) db_conn.commit() except: self.logger.log_readfail(filename, unicode_traceback()) def _importfile(self, filename): '''Insert data from file to database. Read first until last date is found, and then parse lines.''' file = open(filename, 'rU') lastdate = groupfeed_getlast() for line in file: # Find last position linedata = self.parseline(line) if not linedata: continue if lastdate is None or linedata[0] >= lastdate: break data = [linedata] # Store first row # Previous dates by valve prevdates = {linedata[1]: linedata[0]} for line in file: linedata = self.parseline(line) if not linedata: continue date, valve, count, grams = linedata if not getgroupspervalve(valve): if grams != 0: self.logger.log_skipped_gfeed(recode(line)) continue if prevdates.has_key(valve): datedelta = (date - prevdates[valve]).days if datedelta > 1: # Missing days, divide sum self.logger.log_filledin_gfeed() grams = grams // datedelta prevdates[valve] = date dbdata = (date_todb(date), valve, count, grams) data.append(dbdata) self.cursor.executemany("REPLACE INTO groupfeed VALUES (?,?,?,?)", data) def getimporttasks(logger): '''Get list of import tasks as [(description, function, args, kwargs)]. Call function(*args, **kwargs) Logger is ImportLogger instance and is used for logging import messages. ''' tasks = [] importA = AnimalFeedImporter(logger) importG = GroupFeedImporter(logger) for filename in importA.filelist(): tasks.append((_("Reading animal feed data from '%s'") % os.path.basename(filename), importA.importfile, (filename, ), {})) tasks.append((_('Reading group feed data'), importG.importfile, (settings.parser.get("DEFAULT", "groupfeed_datafile"), ), {})) return tasks