36b8d384bd
New colors and metrics on performance graph.
190 lines
5.9 KiB
Python
Executable File
190 lines
5.9 KiB
Python
Executable File
#!/usr/bin/python
|
|
# -*- coding: utf-8 -*-
|
|
|
|
|
|
import requests
|
|
import json
|
|
import re
|
|
import subprocess
|
|
import multiprocessing
|
|
import sqlite3
|
|
import datetime
|
|
import sys
|
|
import time
|
|
|
|
#Generate environment with:
|
|
#pex -r requests -r multiprocessing -e inject:main -o warc-inject -s '.' --no-wheel
|
|
#pex -r requests -r multiprocessing -o warc-inject
|
|
# import argparse
|
|
# parser = argparse.ArgumentParser()
|
|
# parser.add_argument('--foo', help='foo help')
|
|
# args = parser.parse_args()
|
|
|
|
def injectItem(item, c):
|
|
while True:
|
|
try:
|
|
metadata = subprocess.Popen(['./ia','metadata', item],
|
|
stdout=subprocess.PIPE).communicate()[0]
|
|
#print 'item metadata is ', metadata, 'item is ', item
|
|
md = json.loads(metadata)
|
|
break
|
|
except Exception, e:
|
|
print 'error: metadata feed went down (%s) for: %s' % (e, item)
|
|
time.sleep(10)
|
|
|
|
if 'files' not in md:
|
|
return
|
|
|
|
res = None
|
|
while True:
|
|
try:
|
|
res = c.execute("select * from items where item = ?", (item,))
|
|
c.commit() # Getting database is locked errors, will this help?
|
|
break
|
|
except sqlite3.OperationalError:
|
|
time.sleep(1)
|
|
print 'got locked database on select, retrying'
|
|
continue
|
|
|
|
|
|
lastUpdate = {}
|
|
for item, fileName, updated, status, took in res:
|
|
|
|
if status == -1: # Auto retry if we couldn't reach gb last time
|
|
continue
|
|
lastUpdate[fileName] = updated
|
|
|
|
dbUpdates = []
|
|
for ff in md['files']:
|
|
if not ff['name'].endswith('arc.gz'): continue
|
|
itemMetadata = {'mtime':ff['mtime']}
|
|
updateTime = datetime.datetime.fromtimestamp(float(ff['mtime']))
|
|
if ff['name'] in lastUpdate and updateTime <= lastUpdate[ff['name']]:
|
|
print "skip {0} because it is up to date".format(ff['name'])
|
|
continue
|
|
|
|
itemMetadata.update(md['metadata'])
|
|
postVars = {'url':'http://archive.org/download/%s/%s' %
|
|
(item,ff['name']),
|
|
'metadata':json.dumps(itemMetadata),
|
|
'c':'ait'}
|
|
start = time.time()
|
|
if True:
|
|
try:
|
|
rp = requests.post("http://localhost:8000/admin/inject", postVars)
|
|
statusCode = rp.status_code
|
|
|
|
except requests.exceptions.ConnectionError, e:
|
|
print 'error: gb inject', postVars['url'], e
|
|
statusCode = -1
|
|
#print postVars['url'], rp.status_code
|
|
else:
|
|
statusCode = 999
|
|
took = time.time() - start
|
|
|
|
print "sent", ff['name'],'to gb, took', took
|
|
sys.stdout.flush()
|
|
|
|
dbUpdates.append((item, ff['name'], updateTime, statusCode, took))
|
|
|
|
|
|
while True:
|
|
try:
|
|
c.executemany("INSERT INTO items VALUES (?,?,?,?,?)",
|
|
dbUpdates)
|
|
c.commit()
|
|
break
|
|
except sqlite3.OperationalError:
|
|
time.sleep(10)
|
|
print 'got locked database on insert, retrying'
|
|
continue
|
|
|
|
print 'completed %s with %s items injected' % (item, len(dbUpdates))
|
|
|
|
def getPage(page):
|
|
|
|
#r = requests.get('https://archive.org/advancedsearch.php?q=collection%3Aarchiveitdigitalcollection&fl%5B%5D=identifier&rows=1&page={0}&output=json&save=yes'.format(page))
|
|
r = requests.get('https://archive.org/advancedsearch.php?q=collection%3Aarchiveitdigitalcollection&fl%5B%5D=identifier&sort[]=date+desc&rows=100&page={0}&output=json&save=yes'.format(page))
|
|
if r.status_code != 200:
|
|
return 0
|
|
|
|
contents = r.content
|
|
jsonContents = json.loads(contents)
|
|
items = [x['identifier'] for x in jsonContents['response']['docs']]
|
|
if len(items) == 0:
|
|
print 'got 0 items for search page', page
|
|
return 0
|
|
print 'loading %s items, %s - %s' % (len(items), items[0], items[-1])
|
|
|
|
db = sqlite3.connect('items.db',
|
|
isolation_level=None,
|
|
detect_types=sqlite3.PARSE_DECLTYPES|sqlite3.PARSE_COLNAMES,
|
|
timeout=30.0)
|
|
|
|
for item in items:
|
|
injectItem(item, db)
|
|
|
|
db.close()
|
|
return len(items)
|
|
|
|
|
|
def dumpDb():
|
|
db = sqlite3.connect('items.db', detect_types=sqlite3.PARSE_DECLTYPES|sqlite3.PARSE_COLNAMES, timeout=30.0)
|
|
c = db.cursor()
|
|
res = c.execute("select * from items")
|
|
for item, fileName, updated, status, took in res:
|
|
print item, fileName, updated, status
|
|
db.close()
|
|
|
|
def showItems():
|
|
db = sqlite3.connect('items.db', detect_types=sqlite3.PARSE_DECLTYPES|sqlite3.PARSE_COLNAMES, timeout=30.0)
|
|
c = db.cursor()
|
|
res = c.execute("select distinct item from items")
|
|
for item, in res:
|
|
print item
|
|
db.close()
|
|
|
|
|
|
|
|
def main():
|
|
print 'arguments were', sys.argv
|
|
if len(sys.argv) == 2:
|
|
if sys.argv[1] == 'init':
|
|
init()
|
|
print 'initialized'
|
|
return sys.exit(0)
|
|
if sys.argv[1] == 'reset':
|
|
import os
|
|
os.unlink('items.db')
|
|
init()
|
|
return sys.exit(0)
|
|
if sys.argv[1] == 'dump':
|
|
dumpDb()
|
|
|
|
if sys.argv[1] == 'items':
|
|
showItems()
|
|
|
|
if sys.argv[1] == 'kill':
|
|
subprocess.Popen("""kill `ps auxx |grep warc-inject|awk -e '{print $2}'`""",
|
|
shell=True,stdout=subprocess.PIPE).communicate()[0]
|
|
|
|
|
|
else:
|
|
#getPage(3)
|
|
from multiprocessing.pool import ThreadPool
|
|
pool = ThreadPool(processes=150)
|
|
pool.map(getPage, xrange(1,1300))
|
|
|
|
|
|
|
|
def init():
|
|
db = sqlite3.connect('items.db', detect_types=sqlite3.PARSE_DECLTYPES|sqlite3.PARSE_COLNAMES, timeout=30.0 )
|
|
c = db.cursor()
|
|
c.execute('''CREATE TABLE items
|
|
(item text, file text, updated timestamp, status integer, took float)''')
|
|
db.commit()
|
|
db.close()
|
|
|
|
if __name__ == '__main__':
|
|
main()
|