open-source-search-engine/junkdrawer/script/inject/__main__.py

562 lines
19 KiB
Python

#!/usr/bin/python
# -*- coding: utf-8 -*-
import requests
import json
import re
import subprocess
import multiprocessing
import sqlite3
import datetime
import sys
import time
# import flask
import signal, os
import random
from itertools import repeat
staleTime = datetime.timedelta(90,0,0) # three month for now
# app = flask.Flask(__name__)
# app.secret_key = 'oaisj84alwsdkjhf9238u'
def getDb(makeDates=True):
if makeDates:
return sqlite3.connect('./items.db', detect_types=sqlite3.PARSE_DECLTYPES|sqlite3.PARSE_COLNAMES, timeout=30.0 )
else:
return sqlite3.connect('./items.db', timeout=30.0 )
def handler(signum, frame):
print 'Signal handler called with signal', signum
raise IOError("should kill the thread!")
#Generate environment with:
#pex -r requests -r multiprocessing -e inject:main -o warc-inject -s '.' --no-wheel
#pex -r requests -r multiprocessing -o warc-inject
# see the Makefile
# TODO: add argument parser
# import argparse
# parser = argparse.ArgumentParser()
# parser.add_argument('--foo', help='foo help')
# args = parser.parse_args()
def reallyExecute(c, query, qargs):
while True:
try:
res = c.execute(query, qargs)
#c.commit() # Getting database is locked errors, will this help?
return res
except sqlite3.OperationalError, e:
time.sleep(1)
print 'got locked database %s,%s, retrying (%s)' % (query,qargs,e)
continue
def reallyExecuteMany(c, query, qargs):
while True:
try:
res = c.executemany(query, qargs)
#c.commit() # Getting database is locked errors, will this help?
return res
except sqlite3.OperationalError:
time.sleep(1)
print 'got locked database %s, retrying' % query
continue
def injectItem(item, db, mode):
itemStart = time.time()
c = db.cursor()
res = reallyExecute(c, 'select * from items where item = ?', (item,)).fetchone()
db.commit()
itemId = None
if res:
if res[1] > (datetime.datetime.now() - staleTime):
print 'skipping %s because we checked recently' % item
return time.time() - itemStart # We checked recently
itemId = res[0]
while True:
try:
metadata = subprocess.Popen(['./ia','metadata', item],
stdout=subprocess.PIPE).communicate()[0]
#print 'item metadata is ', metadata, 'item is ', item
md = json.loads(metadata)
break
except Exception, e:
print 'error: metadata feed went down (%s) for: %s' % (e, item)
time.sleep(10)
if itemId is None:
reallyExecute(c, "insert INTO items VALUES (?,?)", (item, datetime.datetime.now()))
itemId = c.lastrowid
db.commit()
if 'files' not in md:
time.time() - itemStart
res = None
res = reallyExecute(c, "select fileName, updated, status, took from files where itemId = ?",
(itemId,)).fetchall()
db.commit()
lastUpdate = {}
for fileName, updated, status, took in res:
if status == -1: # Auto retry if we couldn't reach gb last time
continue
lastUpdate[fileName] = updated
dbUpdates = []
skipped = 0
warcs = filter(lambda x: 'name' in x and x['name'].endswith and x['name'].endswith('arc.gz'), md['files'])
collectionName = md['metadata'].get('archiveit-collection-name', '')
for ii, ff in enumerate(warcs):
#if not ff['name'].endswith('arc.gz'): continue
itemMetadata = {'mtime':ff['mtime']}
updateTime = datetime.datetime.fromtimestamp(float(ff['mtime']))
if mode != 'force' and ff['name'] in lastUpdate and updateTime <= lastUpdate[ff['name']]:
print "skip {0} because it is up to date".format(ff['name'])
skipped += 1
requests.post('http://localhost:10008/progress',
json={'item':item, 'total':len(warcs), 'done':ii+1,
'collection-name':collectionName})
continue
itemMetadata.update(md['metadata'])
postVars = {'url':'http://archive.org/download/%s/%s' %
(item,ff['name']),
'metadata':json.dumps(itemMetadata),
'c':'ait',
'spiderlinks':0}
start = time.time()
if mode == 'testing':
time.sleep(random.randint(1,4))
statusCode = 999
else:
try:
rp = requests.post("http://localhost:8000/admin/inject", postVars)
statusCode = rp.status_code
except requests.exceptions.ConnectionError, e:
print 'error: gb inject', postVars['url'], e
statusCode = -1
#print postVars['url'], rp.status_code
took = time.time() - start
print "sent", ff['name'],'to gb, took', took
sys.stdout.flush()
dbUpdates.append((itemId, ff['name'], updateTime, statusCode, took))
requests.post('http://localhost:10008/progress',
json={'item':item, 'total':len(warcs), 'done':ii+1,
'collection-name':collectionName})
if len(dbUpdates):
reallyExecuteMany(c, "DELETE FROM files where fileName = ? ", zip(lastUpdate.iterkeys()))
reallyExecuteMany(c, "INSERT INTO files VALUES (?,?,?,?,?)",
dbUpdates)
db.commit()
print 'completed %s with %s items injected and %s skipped' % (item, len(dbUpdates), skipped)
return time.time() - itemStart
def getPage(zippedArgs):
page, mode, resultsPerPage, extraQuery = zippedArgs
query = 'collection%3Aarchiveitdigitalcollection+' + extraQuery
#r = requests.get('https://archive.org/advancedsearch.php?q=collection%3Aarchiveitdigitalcollection&fl%5B%5D=identifier&rows=1&page={0}&output=json&save=yes'.format(page))
url = 'https://archive.org/advancedsearch.php?q={1}&fl%5B%5D=identifier&sort[]=date+asc&rows={2}&page={0}&output=json'.format(page, query, resultsPerPage)
try:
r = requests.get(url)
if r.status_code != 200:
return 0
contents = r.content
jsonContents = json.loads(contents)
items = [x['identifier'] for x in jsonContents['response']['docs']]
numFound = jsonContents['response']['numFound']
if len(items) == 0:
requests.post('http://localhost:10008/progress', json={'total':numFound, 'completed':'', 'query':extraQuery})
print 'got 0 items for search page', page
return 0
print 'loading %s items, %s - %s of %s' % (len(items), items[0], items[-1], numFound)
for item in items:
db = getDb()
took = injectItem(item, db, mode)
db.close()
requests.post('http://localhost:10008/progress', json={'total':numFound,
'completed':item,
'query':extraQuery,
'took':took})
return len(items)
except Exception, e:
print 'Caught', e, 'sleep and retry', url
time.sleep(60)
return getPage(zippedArgs)
def dumpDb():
db = getDb()
c = db.cursor()
res = c.execute("select * from files")
for (itemId, fileName, updated, status, took) in res.fetchall():
print 'xxx',itemId, fileName, updated, status, took
res = c.execute("select ROWID, item, checked from items")
for (rid, item, checked) in res.fetchall():
print 'yyy',(rid, item, checked)
db.close()
def showItems():
db = getDb()
c = db.cursor()
res = c.execute("select distinct item from items")
for item, in res:
print item
db.close()
def nuke(lastPid, fromOrbit=False):
try:
requests.post('http://localhost:10008/shutdown', {})
except:
pass
sig = signal.SIGTERM
if fromOrbit:
sig = signal.SIGKILL
if lastPid is not None:
try:
ret = os.kill(int(lastPid), signal.SIGTERM)
print 'killing ', ret
return
except:
pass
killed = subprocess.Popen("""kill `ps auxx |grep warc-inject|grep -v grep|awk -e '{print $2}'`""",
shell=True,stdout=subprocess.PIPE).communicate()[0]
if killed == 'Terminated':
print 'got it'
return
print 'missed', killed
def main():
global staleTime
print 'arguments were', sys.argv, 'pid is', os.getpid()
if sys.argv[1] != 'monitor':
try:
lastPid = open('running.pid', 'r').read()
except:
lastPid = None
open('running.pid', 'w').write(str(os.getpid()))
# p = multiprocessing.Process(target=serveForever)
# p.start()
if sys.argv[1] == 'test':
query = ''
if len(sys.argv) == 3:
query = sys.argv[2]
#subprocess.Popen(['python','inject', 'monitor'])
mode = 'testing'
runInjects(10, 'testing', query)
if sys.argv[1] == 'run':
query = ''
if len(sys.argv) == 4:
query = sys.argv[3]
#subprocess.Popen(['./warc-inject','monitor'])
threads = int(sys.argv[2])
runInjects(threads, 'production', query)
print "done running"
if len(sys.argv) == 2:
if sys.argv[1] == 'monitor':
import monitor
monitor.main()
if sys.argv[1] == 'init':
init()
print 'initialized'
return sys.exit(0)
if sys.argv[1] == 'reset':
os.unlink('items.db')
init()
return sys.exit(0)
if sys.argv[1] == 'dump':
dumpDb()
if sys.argv[1] == 'items':
showItems()
if sys.argv[1] == 'stop':
nuke(lastPid)
if sys.argv[1] == 'kill':
nuke(lastPid, fromOrbit=True)
if sys.argv[1] == 'test':
subprocess.Popen(['./warc-inject','monitor'])
mode = 'testing'
runInjects(10, 'testing')
if sys.argv[1] == 'migrate':
db = getDb()
c = db.cursor()
c.execute('ALTER TABLE items RENAME TO old')
db.commit()
c.execute('''CREATE TABLE files
(itemId TEXT, fileName TEXT, updated TIMESTAMP, status INTEGER, took FLOAT)''')
c.execute('''CREATE TABLE items
(item text, checked timestamp)''')
c.execute('''CREATE INDEX item_index ON items (item)''')
# res = c.execute("select count(*) from old")
# print list(res)
#res = c.execute("select distinct item from items")
alreadyItem = {}
# res = c.execute("select * from old")
# print (len(list(res)))
res = c.execute("select * from old")
now = datetime.datetime.now()
for (item, fileName, updated, status, took) in res.fetchall():
#print 'inserting row', item
if item not in alreadyItem:
c.execute("insert INTO items VALUES (?,?)", (item, now))
alreadyItem[item] = c.lastrowid
c.execute("INSERT INTO files VALUES (?,?,?,?,?)",
(alreadyItem[item], fileName, updated, status, took))
c.execute('''drop table old''')
db.commit()
db.close()
dumpDb()
return
if sys.argv[1] == 'testsig':
def handler(signum, frame):
print 'Signal handler called with signal', signum
raise IOError("Couldn't open device!")
# Set the signal handler and a 5-second alarm
signal.signal(signal.SIGTERM, handler)
#signal.alarm(5)
# This open() may hang indefinitely
time.sleep(100)
#fd = os.open('/dev/ttyS0', os.O_RDWR)
signal.alarm(0) # Disable the alarm
# if sys.argv[1] == 'serve':
# serveForever()
if len(sys.argv) == 3:
if sys.argv[1] == 'force':
itemName = sys.argv[2]
db = getDb()
injectItem(itemName, db, 'production')
sys.exit(0)
if len(sys.argv) == 4:
if sys.argv[1] == 'injectfile':
staleTime = datetime.timedelta(0,0,0)
from multiprocessing.pool import ThreadPool
fileName = sys.argv[2]
items = filter(lambda x: x, open(fileName, 'r').read().split('\n'))
threads = int(sys.argv[3])
pool = ThreadPool(processes=threads)
#print zip(files, repeat(getDb(), len(files)), repeat('production', len(files)))
def injectItemTupleWrapper(itemName):
db = getDb()
ret = injectItem(itemName, db, 'production')
db.close()
return ret
answer = pool.map(injectItemTupleWrapper, items)
print 'finished: ', answer
sys.exit(0)
if sys.argv[1] == 'forcefile':
staleTime = datetime.timedelta(0,0,0)
from multiprocessing.pool import ThreadPool
fileName = sys.argv[2]
items = filter(lambda x: x, open(fileName, 'r').read().split('\n'))
threads = int(sys.argv[3])
pool = ThreadPool(processes=threads)
#print zip(files, repeat(getDb(), len(files)), repeat('production', len(files)))
def injectItemTupleWrapper(itemName):
db = getDb()
ret = injectItem(itemName, db, 'force')
db.close()
return ret
answer = pool.map(injectItemTupleWrapper, items)
print 'finished: ', answer
sys.exit(0)
if sys.argv[1] == 'injectitems':
from multiprocessing.pool import ThreadPool
fileName = sys.argv[2]
items = filter(lambda x: x, open(fileName, 'r').read().split('\n'))
threads = int(sys.argv[3])
pool = ThreadPool(processes=threads)
#print zip(files, repeat(getDb(), len(files)), repeat('production', len(files)))
def injectItemTupleWrapper(itemName):
db = getDb()
ret = injectItem(itemName, db, 'production')
db.close()
return ret
answer = pool.map(injectItemTupleWrapper, items)
sys.exit(0)
def getNumResults(query):
query = 'collection%3Aarchiveitdigitalcollection+' + query
r = requests.get('https://archive.org/advancedsearch.php?q={0}&fl%5B%5D=identifier&sort[]=date+asc&rows=1&page=0&output=json'.format(query))
if r.status_code != 200:
return 0
contents = r.content
jsonContents = json.loads(contents)
numFound = jsonContents['response']['numFound']
return numFound
def runInjects(threads, mode='production', query=''):
from multiprocessing.pool import ThreadPool
import math
pool = ThreadPool(processes=threads)
try:
totalResults = getNumResults(query)
resultsPerPage = 100
maxPages = int(math.ceil(totalResults / float(resultsPerPage)))
if maxPages < threads:
maxPages = threads
resultsPerPage = int(math.ceil(totalResults / float(maxPages)))
print threads, ' threads,', totalResults, 'total,', maxPages, 'pages', resultsPerPage, 'results per page'
answer = pool.map(getPage, zip(xrange(1,maxPages),
repeat(mode, maxPages),
repeat(resultsPerPage, maxPages),
repeat(query, maxPages)))
print "finished item pass", answer
except (KeyboardInterrupt, SystemExit):
print 'ok, caught'
requests.post('http://localhost:10008/shutdown', {})
sys.exit(0)
#raise
def init():
db = getDb()
c = db.cursor()
c.execute('''CREATE TABLE files
(itemId TEXT, fileName TEXT, updated TIMESTAMP, status INTEGER, took FLOAT)''')
c.execute('''CREATE TABLE items
(item text, checked timestamp)''')
c.execute('''CREATE INDEX item_index ON items (item)''')
db.commit()
db.close()
# def serveForever():
# @app.route('/',
# methods=['GET', 'POST'], endpoint='home')
# def home():
# db = getDb(makeDates=False)
# res = db.execute('select * from items limit 10')
# for item, checked in res.fetchall():
# print item
# try:
# metadata = subprocess.Popen(['./ia','metadata', item],
# stdout=subprocess.PIPE).communicate()[0]
# break
# except Exception, e:
# pass
# db.close()
# return flask.make_response(metadata)
# @app.route('/progress',
# methods=['GET', 'POST'], endpoint='progress')
# def progress():
# r = requests.get('https://archive.org/advancedsearch.php?q=collection%3Aarchiveitdigitalcollection&fl%5B%5D=identifier&sort[]=date+desc&rows=1&page=1&output=json')
# if r.status_code != 200:
# return flask.make_response(json.dumps({error:'ia search feed is down'}),
# 'application/json')
# contents = r.content
# jsonContents = json.loads(contents)
# numFound = jsonContents['response']['numFound']
# db = getDb()
# examinedItems = db.execute('select count(*) from items').fetchone()
# itemsWithWarc = db.execute('select count(*) from items where ROWID in (select itemId from files where files.status = 200)').fetchone()
# return flask.make_response(json.dumps({'totalItems':numFound,
# 'examinedItems':examinedItems,
# 'itemsWithWarc':itemsWithWarc
# }, indent=4), 'application/json')
# @app.route('/items',
# methods=['GET', 'POST'], endpoint='items')
# def items():
# db = getDb(makeDates=False)
# c = db.cursor()
# res = c.execute("select item, checked from items")
# out = []
# for item, checked in res.fetchall():
# out.append({'item':item, 'checked':checked})
# db.close()
# return flask.make_response(json.dumps(out), 'application/json')
# app.run('0.0.0.0',
# port=7999,
# debug=False,
# use_reloader=False,
# use_debugger=False)
if __name__ == '__main__':
main()