#!/usr/bin/python # Copyright (C) 2001-2008, Christof Meerwald # http://jabrss.cmeerw.org # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; version 2 dated June, 1991. # # This program is distributed in the hope that it will be useful, but # WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 # USA import codecs, httplib, md5, rfc822, os, random, re, socket, string, struct import sys, time, thread, traceback, types, zlib import sqlite3 import warnings warnings.filterwarnings('ignore', category=DeprecationWarning, message='The xmllib module is obsolete. Use xml.sax instead.') import xmllib SOCKET_CONNECTTIMEOUT = 60 SOCKET_TIMEOUT = 60 try: from cStringIO import StringIO except ImportError: from StringIO import StringIO import mimetools if hasattr(socket, 'setdefaulttimeout'): # Python >= 2.3 has native support for socket timeouts socket.setdefaulttimeout(SOCKET_CONNECTTIMEOUT) TimeoutException = socket.timeout else: class TimeoutException(socket.error): pass # try to use timeoutsocket if it is available try: import timeoutsocket timeoutsocket.Timeout = TimeoutException timeoutsocket.setDefaultSocketTimeout(SOCKET_CONNECTTIMEOUT) except ImportError: pass re_validprotocol = re.compile('^(?P[a-z]+):(?P.*)$') re_validhost = re.compile('^(?P[a-zA-Z0-9-]+(\.[a-zA-Z0-9-]+)+)(:(?P[0-9a-z]+))?(?P(/.*)?)$') re_blockhost = re.compile('^(10\.|127\.|172\.1[6789]\.|172\.2[0-9]\.|172\.3[01]\.|192\.168\.)') re_spliturl = re.compile('^(?P[a-z]+)://(?P[^/]+)(?P/?.*)$') str_trans = string.maketrans( '\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f' + '\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09\x0a\x0b\x0c\x0d\x0e\x0f', ' \x0a ') unicode_trans = { 0x00 : 0x20, 0x01 : 0x20, 0x02 : 0x20, 0x03 : 0x20, 0x04 : 0x20, 0x05 : 0x20, 0x06 : 0x20, 0x07 : 0x20, 0x08 : 0x20, 0x09 : 0x20, 0x0a : 0x0a, 0x0b : 0x20, 0x0c : 0x20, 0x0d : 0x20, 0x0e : 0x20, 0x0f : 0x20, 0x10 : 0x20, 0x11 : 0x20, 0x12 : 0x20, 0x13 : 0x20, 0x14 : 0x20, 0x15 : 0x20, 0x16 : 0x20, 0x17 : 0x20, 0x18 : 0x20, 0x19 : 0x20, 0x1a : 0x20, 0x1b : 0x20, 0x1c : 0x20, 0x1d : 0x20, 0x1e : 0x20, 0x1f : 0x20 } random.seed() def RSS_Resource_db(): db = sqlite3.Connection(DB_FILENAME, 60000) db.isolation_level = None db.cursor().execute('PRAGMA synchronous=NORMAL') return db def default_log_message(*msg): return class Null_Synchronizer: def acquire(self): return def release(self): return # configuration settings INTERVAL_DIVIDER = 3 MIN_INTERVAL = 45*60 MAX_INTERVAL = 24*60*60 DB_FILENAME = 'jabrss_res.db' log_message = default_log_message def init(db_fname = DB_FILENAME, min_interval = MIN_INTERVAL, max_interval = MAX_INTERVAL, interval_div = INTERVAL_DIVIDER, logmsg_func = log_message, dbsync_obj = Null_Synchronizer()): global DB_FILENAME, MIN_INTERVAL, MAX_INTERVAL, INTERVAL_DIVIDER global log_message DB_FILENAME = db_fname MIN_INTERVAL = min_interval MAX_INTERVAL = max_interval INTERVAL_DIVIDER = interval_div log_message = logmsg_func RSS_Resource._db_sync = dbsync_obj class UrlError(ValueError): pass def split_url(url): mo = re_validprotocol.match(url) if not mo: raise UrlError('can\'t parse protocol of URL "%s"' % (url,)) url_protocol, url_rest = mo.group('protocol', 'rest') if url_rest[:2] != '//': raise UrlError('missing "//" after "%s:"' % (url_protocol,)) url_rest = url_rest[2:] mo = re_validhost.match(url_rest) if not mo: raise UrlError('invalid host in URL "%s"' % (url,)) url_host, url_port, url_path = mo.group('host', 'port', 'path') url_host = url_host.lower() if url_protocol == 'http': if (url_port != '80') and (url_port != 'http') and (url_port != None): raise UrlError('http ports != 80 not allowed') elif url_protocol == 'https': if (url_port != '443') and (url_port != 'https') and (url_port != None): raise UrlError('https ports != 443 not allowed') else: raise UrlError('unsupported protocol "%s"' % (url_protocol)) if url_path == '': url_path = '/' while url_path[:2] == '//': url_path = url_path[1:] if re_blockhost.match(url_host): raise UrlError('host "%s" not allowed' % (url_host,)) return url_protocol, url_host, url_path def normalize_text(s): if type(s) == types.UnicodeType: s = s.translate(unicode_trans) else: s = s.translate(str_trans) s = '\n'.join(filter(lambda x: x != '', map(lambda x: x.strip(), s.split('\n')))) s = ' '.join(filter(lambda x: x != '', s.split(' '))) return s def normalize_obj(o): for attr in dir(o): if attr[0] != '_': value = getattr(o, attr) if type(value) in types.StringTypes: setattr(o, attr, normalize_text(value)) return o def normalize_item(item): normalize_obj(item) if item.descr == '': item.descr = None if not hasattr(item, 'descr_plain'): item.descr_plain = item.descr if not hasattr(item, 'descr_xhtml'): item.descr_xhtml = None del item.descr return item re_dateTime = re.compile('^(?P[1-9][0-9][0-9][0-9])-(?P[01][0-9])-(?P[0-3][0-9])T(?P[0-2][0-9]):(?P[0-6][0-9]):(?P[0-6][0-9])(\\.[0-9]+)?(Z|(?P[-+])(?P[01][0-9]):(?P[0-6][0-9]))$') def parse_dateTime(s): if s == None: return None mo = re_dateTime.match(s) if mo != None: year, month, day, hour, min, sec = map(lambda x: string.atoi(x), mo.group('year', 'month', 'day', 'hour', 'min', 'sec')) tzsign, tzhour, tzmin = mo.group('tzsign', 'tzhour', 'tzmin') if tzhour != None and tzmin != None: tzoff = 60*(60*string.atoi(tzhour) + string.atoi(tzmin)) else: tzoff = 0 if tzsign == '-': tzoff = -tzoff tstamp = int(rfc822.mktime_tz((year, month, day, hour, min, sec, 0, 0, 0, tzoff))) else: tstamp = None return tstamp def parse_Rfc822DateTime(s): if s == None: return None try: tstamp = int(rfc822.mktime_tz(rfc822.parsedate_tz(s))) except: tstamp = None return tstamp def compare_items(l, r): lguid, ltitle, llink = l.guid, l.title, l.link rguid, rtitle, rlink = r.guid, r.title, r.link if ltitle == rtitle: if (lguid != None) and (rguid != None): return lguid == rguid lmo = re_spliturl.match(llink) rmo = re_spliturl.match(rlink) if lmo and rmo: lprotocol, lhost, lpath = lmo.group('protocol', 'host', 'path') rprotocol, rhost, rpath = rmo.group('protocol', 'host', 'path') if lprotocol == rprotocol and lpath == rpath: lhostparts = string.split(string.lower(lhost), '.') if lhostparts[-1] == '': del lhostparts[-1] rhostparts = string.split(string.lower(rhost), '.') if rhostparts[-1] == '': del rhostparts[-1] if len(lhostparts) >= 2: del lhostparts[-1] if len(rhostparts) >= 2: del rhostparts[-1] if len(lhostparts) > len(rhostparts): tmp = lhostparts lhostparts = rhostparts rhostparts = tmp del tmp if len(lhostparts) == len(rhostparts): return lhostparts == rhostparts else: return lhostparts == rhostparts[-len(lhostparts):] else: return 0 else: return llink == rlink else: return 0 class Cursor: def __init__(self, _db): self._txn = False self._locked = False self._cursor = _db.cursor() self._locked = True RSS_Resource._db_sync.acquire() def __del__(self): try: if self._txn: self._cursor.execute('COMMIT') pass finally: if self._locked: RSS_Resource._db_sync.release() def unlock(self): if self._txn: self._cursor.execute('COMMIT') self._txn = False if self._locked: RSS_Resource._db_sync.release() self._locked = False def lock(self): if not self._locked: RSS_Resource._db_sync.acquire() self._locked = True def begin(self): self.lock() if not self._txn: self._cursor.execute('BEGIN') self._txn = True def execute(self, stmt, bindings=None): self.lock() if bindings == None: return self._cursor.execute(stmt) else: return self._cursor.execute(stmt, bindings) def __getattr__(self, name): if name == 'lastrowid': return self._cursor.lastrowid elif name == 'rowcount': return self._cursor.rowcount raise AttributeError('object has no attribute \'%s\'' % (name,)) def getdb(self): return self._cursor.getconnection() RSS_Resource_Cursor = Cursor class Data: def __init__(self, **kw): for key, value in kw.items(): setattr(self, key, value) class HTTPConnection(httplib.HTTPConnection): def putrequest(self, method, url): self._http_vsn = 10 httplib.HTTPConnection.putrequest(self, method, url, True) self._http_vsn = 11 class HTTPSConnection(httplib.HTTPSConnection): def putrequest(self, method, url): self._http_vsn = 10 httplib.HTTPConnection.putrequest(self, method, url, True) self._http_vsn = 11 class DecompressorError(ValueError): pass class Null_Decompressor: def feed(self, s): return s def flush(self): return '' class Deflate_Decompressor: def __init__(self): self._adler32 = zlib.adler32('') self._raw_deflate = False self._decompress = zlib.decompressobj() self._buffer = '' self._state_feed = Deflate_Decompressor._feed_header def _update_adler32(self, data): self._adler32 = zlib.adler32(data, self._adler32) def feed(self, s): self._buffer = self._buffer + s data = '' while self._state_feed and len(self._buffer): res = self._state_feed(self) if res: self._update_adler32(res) data += res if res != None: break return data def flush(self): data = '' while self._state_feed: res = self._state_feed(self) if res: self._update_adler32(res) data += res elif res == '' and self._state_feed != None: raise IOError, 'premature EOF' if len(self._buffer): raise IOError, 'extra data at end of compressed data' return data def _feed_header(self): if len(self._buffer) >= 2: header = self._buffer[:2] header_int = struct.unpack('>H', header)[0] if header_int % 31 != 0: self._raw_deflate = True self._buffer = '\x78\x9c' + self._buffer self._state_feed = Deflate_Decompressor._feed_data return None # need more data return '' def _feed_data(self): if len(self._buffer) > 0: data = self._decompress.decompress(self._buffer) self._buffer = self._decompress.unused_data else: data = self._decompress.flush() self._buffer = self._decompress.unused_data self._state_feed = Deflate_Decompressor._feed_eof if not data: return None if self._buffer: self._state_feed = Deflate_Decompressor._feed_eof if not data: return None return data def _feed_eof(self): self._state_feed = None return '' class Gzip_Decompressor: FTEXT, FHCRC, FEXTRA, FNAME, FCOMMENT = 1, 2, 4, 8, 16 def __init__(self): self._crc = zlib.crc32('') self._size = 0 self._decompress = zlib.decompressobj(-zlib.MAX_WBITS) self._header_flag = 0 self._buffer = '' self._state_feed = Gzip_Decompressor._feed_header_static def _update_crc32(self, data): self._crc = zlib.crc32(data, self._crc) self._size = self._size + len(data) def feed(self, s): self._buffer = self._buffer + s data = '' while self._state_feed and len(self._buffer): res = self._state_feed(self) if res: self._update_crc32(res) data += res if res != None: break return data def flush(self): data = '' while self._state_feed: res = self._state_feed(self) if res: self._update_crc32(res) data += res elif res == '' and self._state_feed != None: raise IOError, 'premature EOF' if len(self._buffer): raise IOError, 'extra data at end of compressed data' return data def _feed_header_static(self): if len(self._buffer) >= 10: magic = self._buffer[:2] if magic != '\037\213': raise IOError, 'Not a gzipped file' method = ord(self._buffer[2]) if method != 8: raise IOError, 'Unknown compression method' self._header_flag = ord(self._buffer[3]) # modtime = self.fileobj.read(4) # extraflag = self.fileobj.read(1) # os = self.fileobj.read(1) self._buffer = self._buffer[10:] self._state_feed = Gzip_Decompressor._feed_header_flags return None # need more data return '' def _feed_header_flags(self): if self._header_flag & Gzip_Decompressor.FEXTRA: if len(self._buffer) >= 2: # Read & discard the extra field, if present xlen = struct.unpack('= (2 + xlen): self._buffer = self._buffer[2 + xlen:] self._header_flag = self._header_flag & ~Gzip_Decompressor.FEXTRA return None elif self._header_flag & Gzip_Decompressor.FNAME: # Read and discard a null-terminated string containing the filename pos = string.find(self._buffer, '\0') if pos != -1: self._buffer = self._buffer[pos + 1:] self._header_flag = self._header_flag & ~Gzip_Decompressor.FNAME return None elif self._header_flag & Gzip_Decompressor.FCOMMENT: # Read and discard a null-terminated string containing a comment pos = string.find(self._buffer, '\0') if pos != -1: self._buffer = self._buffer[pos + 1:] self._header_flag = self._header_flag & ~Gzip_Decompressor.FCOMMENT return None elif self._header_flag & Gzip_Decompressor.FHCRC: if len(self._buffer) >= 2: self._buffer = self._buffer[2:] self._header_flag = self._header_flag & ~Gzip_Decompressor.FHCRC return None else: self._state_feed = Gzip_Decompressor._feed_data return None # need more data return '' def _feed_data(self): if len(self._buffer) > 0: data = self._decompress.decompress(self._buffer) self._buffer = self._decompress.unused_data else: data = self._decompress.flush() self._buffer = self._decompress.unused_data self._state_feed = Gzip_Decompressor._feed_eof if not data: return None if self._buffer: self._state_feed = Gzip_Decompressor._feed_eof if not data: return None return data def _feed_eof(self): if len(self._buffer) >= 8: crc32, isize = struct.unpack("' def handle_unicode_data(self, data): if self._cdata != None: self._cdata += data if len(self._cdata) > 32 * 1024: raise ValueError('item exceeds maximum allowed size') def handle_data(self, data): self.handle_unicode_data(data.decode(self._encoding)) def handle_cdata(self, data): self.handle_unicode_data(data.decode(self._encoding)) def handle_charref(self, name): try: if name[0] == 'x': n = int(name[1:], 16) else: n = int(name) except ValueError: self.unknown_charref(name) return if not 0 <= n <= 65535: self.unknown_charref(name) return self.handle_unicode_data(unichr(n)) def unknown_entityref(self, entity): try: self.handle_unicode_data(ENTITIES[entity]) except KeyError: log_message('ignoring unknown entity ref', entity.encode('iso8859-1', 'replace')) def _current_elem(self): if self._state & 0x08: return self._items[-1] elif self._state & 0x04: return self._channel else: return None ## # Database Schema: # 'S' -> resource_id sequence number (4-byte struct) # 'S' + resource_id -> URL # 'R' + URL -> resource_id (4-byte struct) # 'D' + resource_id -> Resource data # 'E' + resource_id -> error information (string) # 'I' + resource_id -> Resource info # 'H' + resource_id -> Resource history # 'T' + resource_id -> Resource times ## class RSS_Resource: NR_ITEMS = 64 _db_sync = Null_Synchronizer() http_proxy = None def __init__(self, url, res_db=None): self._lock = thread.allocate_lock() self._url = url self._url_protocol, self._url_host, self._url_path = split_url(url) self._id = None self._last_updated, self._last_modified = None, None self._etag = None self._invalid_since, self._err_info = None, None self._redirect, self._redirect_seq = None, None self._penalty = 0 title, description, link = None, None, None if res_db == None: db = RSS_Resource_db() else: db = res_db cursor = Cursor(db) result = cursor.execute('SELECT rid, last_updated, last_modified, etag, invalid_since, redirect, redirect_seq, penalty, err_info, title, description, link FROM resource WHERE url=?', (self._url,)) for row in result: self._id, self._last_updated, self._last_modified, self._etag, self._invalid_since, self._redirect, self._redirect_seq, self._penalty, self._err_info, title, description, link = row if self._id == None: cursor.execute('INSERT INTO resource (url) VALUES (?)', (self._url,)) self._id = cursor.lastrowid if self._last_updated == None: self._last_updated = 0 if self._penalty == None: self._penalty = 0 if title == None: title = self._url if link == None: link = '' if description == None: description = '' self._channel_info = Data(title=title, link=link, descr=description) self._history = [] result = cursor.execute('SELECT time_items0, time_items1, time_items2, time_items3, time_items4, time_items5, time_items6, time_items7, time_items8, time_items9, time_items10, time_items11, time_items12, time_items13, time_items14, time_items15, nr_items0, nr_items1, nr_items2, nr_items3, nr_items4, nr_items5, nr_items6, nr_items7, nr_items8, nr_items9, nr_items10, nr_items11, nr_items12, nr_items13, nr_items14, nr_items15 FROM resource_history WHERE rid=?', (self._id,)) for row in result: history_times = filter(lambda x: x!=None, row[0:16]) history_nr = filter(lambda x: x!=None, row[16:32]) self._history = zip(history_times, history_nr) del cursor del db def lock(self): self._lock.acquire() def unlock(self): self._lock.release() def url(self): return self._url def id(self): return self._id def channel_info(self): return self._channel_info def times(self): last_updated, last_modified, invalid_since = self._last_updated, self._last_modified, self._invalid_since if last_modified == None: last_modified = 0 return last_updated, last_modified, invalid_since def redirect_info(self, res_db=None): if self._redirect == None: return None, None if res_db == None: db = RSS_Resource_db() else: db = res_db cursor = Cursor(db) result = cursor.execute('SELECT url FROM resource WHERE rid=?', (self._redirect,)) redirect_url = None for row in result: redirect_url = row[0] del cursor return redirect_url, self._redirect_seq def penalty(self): return self._penalty def error_info(self): return self._err_info def history(self): return self._history # @return ([item], next_item_id, redirect_resource, redirect_seq, [redirects]) # locks the resource object if new_items are returned def update(self, db=None, redirect_count=5, redirect_cb=None): now = int(time.time()) # sanity check update interval if now - self._last_updated < 60: return [], None, None, None, [] error_info = None nr_new_items = 0 feed_xml_downloaded = False feed_xml_changed = False first_item_id = None items = [] prev_updated = self._last_updated self._last_updated = now if not self._invalid_since: # expect the worst, will be reset later self._invalid_since = now if db == None: db = RSS_Resource_db() cursor = None redirect_penalty = 0 redirect_tries = redirect_count redirect_permanent = True redirect_resource = None redirect_seq = None redirects = [] http_conn = None http_protocol = None http_host = None try: url_protocol, url_host, url_path = self._url_protocol, self._url_host, self._url_path while redirect_tries > 0: redirect_tries = -(redirect_tries - 1) if redirect_permanent: redirect_url = url_protocol + '://' + url_host + url_path if redirect_url != self._url: #log_message('redirect: %s -> %s' % (self._url.encode('iso8859-1', 'replace'), redirect_url.encode('iso8859-1', 'replace'))) if redirect_cb != None: redirect_resource, redirects = redirect_cb(redirect_url, db, -redirect_tries + 1) # only perform the redirect if target is valid if redirect_resource._invalid_since: error_info = redirect_resource._err_info self._last_modified = redirect_resource._last_modified self._etag = redirect_resource._etag redirect_resource = None else: cursor = Cursor(db) redirect_items, redirect_seq = redirect_resource.get_headlines(0, cursor) items, first_item_id, nr_new_items = self._process_new_items(redirect_items, cursor) del redirect_items self._last_modified = None self._etag = None self._redirect = redirect_resource._id self._redirect_seq = redirect_seq cursor.begin() cursor.execute('UPDATE resource SET redirect=?, redirect_seq=? WHERE rid=?', (self._redirect, self._redirect_seq, self._id)) break if RSS_Resource.http_proxy and (url_protocol == 'http'): host = RSS_Resource.http_proxy request = 'http://' + url_host + url_path else: host = url_host request = url_path if url_protocol == 'http' and http_protocol == url_protocol and http_host == host and http_conn != None: conn_reused = True h = http_conn else: conn_reused = False if url_protocol == 'https': h = HTTPSConnection(host) else: h = HTTPConnection(host) http_protocol = url_protocol http_host = host http_conn = None try: h.putrequest('GET', request) if conn_reused: log_message('reused HTTP connection') except httplib.CannotSendRequest: log_message('caught CannotSendRequest, opening new connection') if not conn_reused: raise h = HTTPConnection(host) h.putrequest('GET', request) # adjust the socket timeout after the connection has been # established if hasattr(h.sock, 'settimeout'): h.sock.settimeout(SOCKET_TIMEOUT) elif hasattr(h.sock, 'set_timeout'): h.sock.set_timeout(SOCKET_TIMEOUT) if not RSS_Resource.http_proxy: h.putheader('Host', url_host) h.putheader('Connection', 'Keep-Alive') h.putheader('Pragma', 'no-cache') h.putheader('Cache-Control', 'no-cache') h.putheader('Accept-Encoding', 'gzip, deflate, identity') h.putheader('User-Agent', 'JabRSS (http://jabrss.cmeerw.org)') if self._last_modified: h.putheader('If-Modified-Since', rfc822.formatdate(self._last_modified)) if self._etag != None: h.putheader('If-None-Match', self._etag) h.endheaders() response = h.getresponse() errcode = response.status errmsg = response.reason headers = response.msg # check the error code if (errcode >= 200) and (errcode < 300): feed_xml_downloaded = True self._last_modified = parse_Rfc822DateTime(headers.get('last-modified', None)) try: self._etag = headers['etag'] except: self._etag = None content_encoding = headers.get('content-encoding', None) transfer_encoding = headers.get('transfer-encoding', None) if (content_encoding == 'gzip') or (transfer_encoding == 'gzip'): log_message('gzip-encoded data') decoder = Gzip_Decompressor() elif (content_encoding == 'deflate') or (transfer_encoding == 'deflate'): log_message('deflate-encoded data') decoder = Deflate_Decompressor() else: decoder = Null_Decompressor() content_maintype = headers.getmaintype() content_subtype = headers.getsubtype() charset = headers.getparam('charset') default_charset = None if content_maintype == 'text': if content_subtype.startswith('xml'): # or maybe iso8859-1 default_charset = 'us-ascii' else: # not strictly conforming here... default_charset = 'iso8859-1' rss_parser = Feed_Parser((self._url_protocol, self._url_host, self._url_path), charset, default_charset) bytes_received = 0 bytes_processed = 0 xml_started = 0 file_hash = md5.new() l = response.read(4096) while l: bytes_received = bytes_received + len(l) if bytes_received > 196 * 1024: raise ValueError('file exceeds maximum allowed size') data = decoder.feed(l) file_hash.update(data) if not xml_started: data = string.lstrip( data) if data: xml_started = 1 bytes_processed = bytes_processed + len(data) if bytes_processed > 256 * 1024: raise ValueError('file exceeds maximum allowed decompressed size') rss_parser.feed(data) l = response.read(4096) response.close() h.close() data = decoder.flush() file_hash.update(data) rss_parser.feed(data) rss_parser.close() new_channel_info = normalize_obj(rss_parser._channel) cursor = Cursor(db) cursor.begin() hash_buffer = buffer(file_hash.digest()) cursor.execute('UPDATE resource SET hash=? WHERE rid=? AND (hash IS NULL OR hash<>?)', (hash_buffer, self._id, hash_buffer)) feed_xml_changed = (cursor.rowcount != 0) self._update_channel_info(new_channel_info, cursor) new_items = map(lambda x: normalize_item(x), rss_parser._items) new_items.reverse() items, first_item_id, nr_new_items = self._process_new_items(new_items, cursor) del new_items # handle "304 Not Modified" elif errcode == 304: # RSS resource is valid self._invalid_since = None # handle "301 Moved Permanently", "302 Found" and # "307 Temporary Redirect" elif (errcode >= 300) and (errcode < 400): bytes_received = 0 l = response.read(4096) while l: bytes_received = bytes_received + len(l) if bytes_received > 48 * 1024: raise ValueError('file exceeds maximum allowed size') l = response.read(4096) response.close() if not response.will_close: # maybe we can reuse the connection http_conn = h if errcode != 301: redirect_permanent = False redirect_penalty += 1 redirect_url = headers.get('location', None) if redirect_url: if not re_validprotocol.match(redirect_url): base_url = '%s://%s' % (url_protocol, url_host) if redirect_url[0] != '/': redirect_url = url_path[:url_path.rindex('/')] + '/' + redirect_url redirect_url = base_url + redirect_url log_message('Following redirect (%d) to "%s"' % (errcode, redirect_url.encode('iso8859-1', 'replace'))) url_protocol, url_host, url_path = split_url(redirect_url) redirect_tries = -redirect_tries else: log_message(errcode, errmsg, headers) error_info = 'HTTP: %d %s' % (errcode, errmsg) else: log_message(errcode, errmsg, headers) error_info = 'HTTP: %d %s' % (errcode, errmsg) if self._invalid_since and not error_info and redirect_tries == 0: error_info = 'redirect: maximum number of redirects exceeded' except TimeoutException, e: error_info = 'timeout: ' + str(e) except socket.error, e: error_info = 'socket: ' + str(e) except IOError, e: error_info = 'I/O error: ' + str(e) except httplib.BadStatusLine, e: error_info = 'HTTP: bad status line' except httplib.IncompleteRead, e: error_info = 'HTTP: incomplete read' except httplib.UnknownProtocol, e: error_info = 'HTTP: unknown protocol' except httplib.HTTPException, e: error_info = 'HTTP: ' + str(e) except DecompressorError, e: error_info = 'decompressor: ' + str(e) except UnicodeError, e: error_info = 'encoding: ' + str(e) except LookupError, e: error_info = 'encoding: ' + str(e) except xmllib.Error, e: error_info = 'RDF/XML parser: ' + str(e) except ValueError, e: error_info = 'misc: ' + str(e) except: traceback.print_exc(file=sys.stdout) if error_info: log_message('Error: %s' % (error_info,)) if cursor == None: cursor = Cursor(db) cursor.begin() if error_info != self._err_info: self._err_info = error_info cursor.execute('UPDATE resource SET err_info=? WHERE rid=?', (self._err_info, self._id)) if not self._invalid_since: if feed_xml_downloaded: if nr_new_items > 0: # downloaded and new items available, good self._penalty = (5 * self._penalty) / 6 elif not feed_xml_changed: # downloaded, but not changed, very bad self._penalty = (3 * self._penalty) / 4 + 256 else: # downloaded and changed, but no new items, bad self._penalty = (15 * self._penalty) / 16 + 64 else: # "not modified" response from server, good self._penalty = (3 * self._penalty) / 4 if redirect_penalty > 0: # penalty for temporary redirects self._penalty = (7 * self._penalty) / 8 + 128 cursor.execute('UPDATE resource SET last_modified=?, last_updated=?, etag=?, invalid_since=?, penalty=? WHERE rid=?', (self._last_modified, self._last_updated, self._etag, self._invalid_since, self._penalty, self._id)) del cursor if nr_new_items: new_items = items[-nr_new_items:] next_item_id = first_item_id + len(items) else: new_items = [] next_item_id = None return new_items, next_item_id, redirect_resource, redirect_seq, redirects def _update_channel_info(self, new_channel_info, cursor): if self._channel_info != new_channel_info: self._channel_info = new_channel_info cursor.execute('UPDATE resource SET title=?, link=?, description=? WHERE rid=?', (self._channel_info.title, self._channel_info.link, self._channel_info.descr, self._id)) # @return ([item], first_item_id, nr_new_items) def _process_new_items(self, new_items, cursor): items, next_item_id = self.get_headlines(0, cursor) first_item_id = next_item_id - len(items) nr_new_items = self._update_items(items, new_items) del new_items if nr_new_items: # we must not have any other objects locked when trying to lock # a resource cursor.unlock() self.lock() cursor.begin() if len(items) > RSS_Resource.NR_ITEMS: first_item_id += len(items) - RSS_Resource.NR_ITEMS del items[:-RSS_Resource.NR_ITEMS] cursor.execute('DELETE FROM resource_data WHERE rid=? AND seq_nr 4) and (len(new_items) > RSS_Resource.NR_ITEMS): cutoff = tstamplist[len(tstamplist) / 2] elif len(tstamplist) > 2: cutoff = tstamplist[0] else: cutoff = None new_items = filter(lambda x: (x.published == None) or (x.published >= cutoff), new_items) new_items.sort(lambda x, y: cmp(x.published, y.published)) for item in new_items: found = False for i in range(0, len(items)): if compare_items(items[i], item): items[i] = item found = True if not found: items.append(item) nr_new_items = nr_new_items + 1 return nr_new_items # @return ([item], next id) def get_headlines(self, first_id, db_cursor=None, db=None): if db_cursor == None: if db == None: cursor = Cursor(RSS_Resource_db()) else: cursor = Cursor(db) else: cursor = db_cursor if first_id == None: first_id = 0 result = cursor.execute('SELECT seq_nr, published, title, link, guid, descr_plain, descr_xhtml FROM resource_data WHERE rid=? AND seq_nr>=? ORDER BY seq_nr', (self._id, first_id)) items = [] last_id = first_id for seq_nr, published, title, link, guid, descr_plain, descr_xhtml in result: if seq_nr >= last_id: last_id = seq_nr + 1 items.append(Data(published=published, title=title, link=link, guid=guid, descr_plain=descr_plain, descr_xhtml=descr_xhtml)) del cursor return items, last_id def next_update(self, randomize=True): min_interval = MIN_INTERVAL max_interval = MAX_INTERVAL if len(self._history) >= 2: hist_items = len(self._history) sum_items = reduce(lambda x, y: (y[0], x[1] + y[1]), self._history[1:])[1] time_span = self._last_updated - self._history[0][0] if hist_items >= 12: time_span_old = self._history[hist_items / 2][0] - self._history[0][0] sum_items_old = reduce(lambda x, y: (y[0], x[1] + y[1]), self._history[1:hist_items / 2 + 1])[1] if (3 * sum_items_old < sum_items) and (5 * time_span_old < time_span): time_span = time_span_old sum_items = sum_items_old # sum_items_new = sum_items - sum_items_old elif (3 * sum_items_old > 2 * sum_items) and (5 * time_span_old > 4 * time_span): time_span = time_span - time_span_old sum_items = sum_items - sum_items_old interval = time_span / sum_items / INTERVAL_DIVIDER # apply a bonus for well-behaved feeds interval = 32 * interval / (64 - self._penalty / 28) max_interval = 32 * max_interval / (64 - self._penalty / 28) min_interval = 32 * min_interval / (48 - self._penalty / 64) elif len(self._history) == 1: time_span = self._last_updated - self._history[0][0] interval = 30*60 + time_span / 3 min_interval = 60*60 elif self._invalid_since: time_span = self._last_updated - self._invalid_since interval = 4*60*60 + time_span / 4 max_interval = 48*60*60 else: interval = 8*60*60 if string.find(string.lower(self._url), 'slashdot.org') != -1: # yes, slashdot sucks - this is a special slashdot # throttle to avaoid being banned by slashdot interval = interval + 150*60 # apply upper and lower bounds to the interval interval = min(max_interval, max(min_interval, interval)) # and add some random factor if randomize: return self._last_updated + interval + int(random.normalvariate(30, 50 + interval / 50)) else: return self._last_updated + interval def RSS_Resource_id2url(res_id, db_cursor=None): if db_cursor == None: cursor = Cursor(RSS_Resource_db()) else: cursor = db_cursor url = None result = cursor.execute('SELECT url FROM resource WHERE rid=?', (res_id,)) for row in result: url = row[0] del cursor if url == None: raise KeyError(res_id) return url def RSS_Resource_simplify(url): url_protocol, url_host, url_path = split_url(url) simple_url = url_protocol + '://' + url_host + url_path # TODO: return simple_url return url if __name__ == '__main__': import sys init() db = RSS_Resource_db() for url in sys.argv[1:]: resource = RSS_Resource(url, db) new_items, next_item_id, redirect_resource, redirect_seq, redirects = resource.update(db) channel_info = resource.channel_info() print channel_info.title.encode('iso8859-1', 'replace'), channel_info.link.encode('iso8859-1', 'replace'), channel_info.descr.encode('iso8859-1', 'replace') error_info = resource.error_info() if error_info: print 'error info', resource.error_info() if len(new_items) > 0: print 'new items', map(lambda x: (x.title.encode('iso8859-1', 'replace'), x.link.encode('iso8859-1', 'replace')), new_items), next_item_id db.close() del db