Initial commit. TODO - handle errors, man page, command completion

This commit is contained in:
Zachary D. Rowitsch
2023-11-21 13:00:40 -05:00
parent 6213e14643
commit 79dd708775
2 changed files with 518 additions and 0 deletions

17
Cargo.toml Normal file

@ -0,0 +1,17 @@
[package]
name = "warcat"
version = "0.1.0"
edition = "2021"
authors = [ "Zachary D. Rowitsch <rowitsch@yahoo.com>" ]
license = "MIT"
description = "A command-line tool for working with WARC files."
[dependencies]
warc = "0.3.1"
clap = { version = "4.4.8", features = ["derive"] }
indicatif = "0.17.7"
sanitize-filename = "0.5.0"
url = "2.4.1"
sha1_smol = "1.0.0"
gzp = "0.11.3"
regex = "1.10.2"

501
src/main.rs Normal file

@ -0,0 +1,501 @@
use clap::{Parser, Subcommand};
use gzp::{
deflate::Gzip,
par::compress::{ParCompress, ParCompressBuilder},
ZWriter,
};
use indicatif::{HumanCount, ProgressBar};
use regex::Regex;
use sanitize_filename::sanitize;
use sha1_smol::Sha1;
use std::io::{BufWriter, Read, Write};
use std::time::Duration;
use url;
use warc::{Record, StreamingBody, WarcHeader, WarcReader, WarcWriter};
#[derive(Parser)]
#[command(author, version, about, long_about = None)]
struct Cli {
#[command(subcommand)]
command: Commands,
/// Filter by Content-Type header value, e.g. "text/html"
#[arg(short = 'c', long)]
pub filter_by_content_type: Vec<String>,
/// Filter by WARC-Type header value, e.g. "response"
#[arg(short = 'w', long)]
pub filter_by_warc_type: Vec<String>,
/// Filter WARC-Target-URI with regex, e.g. ".*\.gov"
#[arg(short = 'u', long)]
pub filter_by_uri: Vec<String>,
/// Filter by WARC-Record-ID, e.g. "29d2a1bb-21a3-4074-a0a9-68c8bc301b85"
#[arg(short = 'r', long)]
pub filter_by_recordid: Vec<String>,
/// Verbose output (does no effect all commands)
#[arg(short, long)]
verbose: bool,
}
#[derive(Subcommand)]
enum Commands {
/// Naively join archives into one
Concat {
/// Target WARC File to create
target_file: String,
/// WARC Files to join
files: Vec<String>,
},
/// Extract files from an archive
Extract {
/// WARC File
file: String,
/// Output directory
#[arg(short, long, default_value = ".")]
output: String,
},
/// List contents of an archive
List {
/// WARC File
file: String,
},
/// Verify digest and validate archive conformance
Verify {
/// WARC File
file: String,
},
}
///Use pgz to create a parallel gzip writer
fn parallel_gzip_warc_writer(
path: &String,
) -> Result<WarcWriter<BufWriter<ParCompress<Gzip>>>, std::io::Error> {
let file = std::fs::OpenOptions::new()
.read(true)
.write(true)
.create(true)
.open(&path)?;
let parz: ParCompress<Gzip> = ParCompressBuilder::new().from_writer(file);
let writer = std::io::BufWriter::with_capacity(1 * 1024 * 1024, parz);
Ok(WarcWriter::new(writer))
}
fn concat(
target_file: &String,
files: &Vec<String>,
verbose: &bool,
filter_by_type: &Vec<String>,
filter_by_uri: &Vec<regex::Regex>,
filter_by_warc_type: &Vec<String>,
filter_by_recordid: &Vec<String>,
) {
if *verbose {
println!("Concatenating {} files to {}", files.len(), target_file);
}
let mut target = match parallel_gzip_warc_writer(target_file) {
Ok(t) => t,
Err(e) => {
println!("Error opening target: {:?}", e);
return;
}
};
for file in files {
if *verbose {
println!("Adding {}", file);
}
let mut t = match WarcReader::from_path_gzip(file) {
Ok(t) => t,
Err(e) => {
println!("Error opening {}: {:?}", file, e);
return;
}
};
let mut iter = t.stream_records();
let mut count = 0;
while let Some(somrec) = iter.next_item() {
match somrec {
Ok(rec) => {
if !filter_by_type.is_empty() && !filter_by_content_types(&rec, &filter_by_type)
{
continue;
}
if !filter_by_uri.is_empty() && !filter_by_uri_regexes(&rec, &filter_by_uri) {
continue;
}
if !filter_by_warc_type.is_empty()
&& !filter_by_warc_types(&rec, &filter_by_warc_type)
{
continue;
}
if !filter_by_recordid.is_empty()
&& !filter_by_recordids(&rec, &filter_by_recordid)
{
continue;
}
let record_with_body = match rec.into_buffered() {
Ok(r) => r,
Err(e) => {
println!("Error buffering record: {:?}", e);
return;
}
};
let _ = match target.write(&record_with_body) {
Ok(_) => {}
Err(e) => {
println!("Error writing to target: {:?}", e);
return;
}
};
count += 1;
}
Err(e) => {
println!("Error reading record: {:?}", e);
return;
}
}
}
if *verbose {
println!("Added {} records from {}", count, file);
}
}
let mut gzip_stream = match target.into_inner() {
Ok(t) => t,
Err(_) => {
println!("Error: failed flushing compressed stream");
return;
}
};
gzip_stream.finish().unwrap();
}
/// Extract each record in the WARC file
fn extract(
file: &String,
output: &String,
verbose: &bool,
filter_by_type: &Vec<String>,
filter_by_uri: &Vec<regex::Regex>,
filter_by_warc_type: &Vec<String>,
filter_by_recordid: &Vec<String>,
) {
let mut t = match WarcReader::from_path_gzip(file) {
Ok(t) => t,
Err(e) => {
println!("Error: {:?}", e);
return;
}
};
let mut streamiter = t.stream_records();
while let Some(somrec) = streamiter.next_item() {
match somrec {
Ok(mut rec) => {
if !filter_by_type.is_empty() && !filter_by_content_types(&rec, &filter_by_type) {
continue;
}
if !filter_by_uri.is_empty() && !filter_by_uri_regexes(&rec, &filter_by_uri) {
continue;
}
if !filter_by_warc_type.is_empty()
&& !filter_by_warc_types(&rec, &filter_by_warc_type)
{
continue;
}
if !filter_by_recordid.is_empty() && !filter_by_recordids(&rec, &filter_by_recordid)
{
continue;
}
let recordid = match rec.header(WarcHeader::RecordID) {
Some(id) => id,
None => std::borrow::Cow::Borrowed("unknown"),
};
let mut buffer = [0; 65536];
let targeturi = match rec.header(WarcHeader::TargetURI) {
Some(uri) => uri,
None => {
if *verbose {
println!("Skipping record with no URI (RecordID: {})", recordid);
}
continue;
}
};
let url = match url::Url::parse(&targeturi) {
Ok(url) => url,
Err(e) => {
if *verbose {
println!(
"Skipping record with invalid URI: {:?} (RecordID: {})",
e, recordid
);
}
continue;
}
};
let host_str = match url.host_str() {
Some(host_str) => host_str,
None => {
if *verbose {
println!(
"Skipping record with no unparsable URI host string (RecordID: {})",
recordid
);
}
continue;
}
};
let urlpath = url.path();
let url_path_segments = match url.path_segments() {
Some(url_path_segments) => url_path_segments,
None => {
if *verbose {
println!(
"Skipping record with no parsable URI path segments (RecordID: {})",
recordid
);
}
continue;
}
};
let mut filename = url_path_segments.last().unwrap_or_default().to_string();
let need_to_gen_hash = filename.is_empty();
if need_to_gen_hash {
filename = "tempfile".to_string();
}
let path = format!("{}/{}/{}", output, sanitize(host_str), sanitize(urlpath));
match std::fs::create_dir_all(&path) {
Ok(_) => {}
Err(e) => {
println!("Error: {:?}", e);
return;
}
}
let filewpath = format!("{}/{}", &path, sanitize(&filename));
if *verbose {
println!("Extracting {}", filewpath);
}
let mut out = std::fs::File::create(&filewpath).unwrap();
let mut hasher = Sha1::new();
while (rec.read(&mut buffer).unwrap()) > 0 {
out.write(&buffer).unwrap();
if need_to_gen_hash {
hasher.update(&buffer);
}
}
if need_to_gen_hash {
let digest = hasher.digest().to_string();
let new_filename = format!("_index_{}", &digest[digest.len() - 5..]);
let new_filewpath = format!("{}/{}", &path, new_filename);
if *verbose {
println!("Renaming {} to {}", filewpath, new_filewpath);
}
std::fs::rename(&filewpath, new_filewpath).unwrap();
}
}
Err(e) => {
println!("Error: {:?}", e);
}
}
}
}
/// Verify each record in the WARC file
/// TODO: Make sure we verify checksums if they are in the header
fn verify(file: &String, verbose: &bool) {
let t = match WarcReader::from_path_gzip(file) {
Ok(t) => t,
Err(e) => {
println!("Error: {:?}", e);
return;
}
};
let pb = ProgressBar::new_spinner();
pb.enable_steady_tick(Duration::from_millis(500));
let mut iter = t.iter_records();
let update_every = 100;
while let Some(somrec) = iter.next() {
match somrec {
Ok(rec) => {
if *verbose {
pb.println(format!(
"Verifying record: {}",
rec.header(WarcHeader::RecordID).unwrap_or_default()
));
}
}
Err(e) => {
pb.println(format!("Error: {}", e));
}
}
pb.inc(1);
if pb.position() % update_every == 0 {
pb.set_message(HumanCount(pb.position() as u64).to_string());
}
}
pb.finish_and_clear();
}
/// List each record in the WARC file
/// TODO: support non-compressed WARC files
fn list(
file: &String,
filter_by_type: &Vec<String>,
filter_by_uri: &Vec<regex::Regex>,
filter_by_warc_type: &Vec<String>,
filter_by_recordid: &Vec<String>,
) {
let mut t = WarcReader::from_path_gzip(file).unwrap();
let mut streamiter = t.stream_records();
while let Some(somrec) = streamiter.next_item() {
match somrec {
Ok(rec) => {
if !filter_by_type.is_empty() && !filter_by_content_types(&rec, &filter_by_type) {
continue;
}
if !filter_by_uri.is_empty() && !filter_by_uri_regexes(&rec, &filter_by_uri) {
continue;
}
if !filter_by_warc_type.is_empty()
&& !filter_by_warc_types(&rec, &filter_by_warc_type)
{
continue;
}
if !filter_by_recordid.is_empty() && !filter_by_recordids(&rec, &filter_by_recordid)
{
continue;
}
println!(
"Record: {}",
rec.header(WarcHeader::RecordID).unwrap_or_default()
);
println!(
" URI: {}",
rec.header(WarcHeader::TargetURI).unwrap_or_default()
);
println!(
" Data: {}",
rec.header(WarcHeader::Date).unwrap_or_default()
);
println!(
" Type: {}",
rec.header(WarcHeader::WarcType).unwrap_or_default()
);
println!(
" Content Type: {}",
rec.header(WarcHeader::ContentType).unwrap_or_default()
);
println!(" Content length: {}", rec.content_length());
}
Err(e) => {
println!("Error: {}", e);
}
}
}
}
fn filter_by_content_types<T: std::io::Read>(
record: &Record<StreamingBody<T>>,
content_types: &Vec<String>,
) -> bool {
let content_type = record.header(WarcHeader::ContentType).unwrap_or_default();
for ct in content_types {
if content_type.contains(ct) {
return true;
}
}
false
}
fn filter_by_uri_regexes<T: std::io::Read>(
record: &Record<StreamingBody<T>>,
regexes: &Vec<regex::Regex>,
) -> bool {
let targeturi = record.header(WarcHeader::TargetURI).unwrap_or_default();
for re in regexes {
if re.is_match(&targeturi) {
return true;
}
}
false
}
fn filter_by_warc_types<T: std::io::Read>(
record: &Record<StreamingBody<T>>,
warc_types: &Vec<String>,
) -> bool {
let warc_type = record.header(WarcHeader::WarcType).unwrap_or_default();
for wt in warc_types {
if warc_type.contains(wt) {
return true;
}
}
false
}
fn filter_by_recordids<T: std::io::Read>(
record: &Record<StreamingBody<T>>,
record_ids: &Vec<String>,
) -> bool {
let recordid = record.header(WarcHeader::RecordID).unwrap_or_default();
for rid in record_ids {
if recordid.contains(rid) {
return true;
}
}
false
}
fn main() {
let cli = Cli::parse();
let uri_regexes = cli
.filter_by_uri
.iter()
.map(|uri| Regex::new(uri).unwrap())
.collect();
match &cli.command {
Commands::Verify { file } => {
if !cli.filter_by_content_type.is_empty()
|| !cli.filter_by_warc_type.is_empty()
|| !cli.filter_by_uri.is_empty()
|| !cli.filter_by_recordid.is_empty()
{
println!("WARN: Filtering options are ignored for verify command");
}
verify(file, &cli.verbose);
}
Commands::Concat { target_file, files } => {
concat(
target_file,
files,
&cli.verbose,
&cli.filter_by_content_type,
&uri_regexes,
&cli.filter_by_warc_type,
&cli.filter_by_recordid,
);
}
Commands::Extract { file, output } => {
extract(
file,
output,
&cli.verbose,
&cli.filter_by_content_type,
&uri_regexes,
&cli.filter_by_warc_type,
&cli.filter_by_recordid,
);
}
Commands::List { file } => {
if cli.verbose {
println!("WARN: Verbose option is ignored for list command");
}
list(
file,
&cli.filter_by_content_type,
&uri_regexes,
&cli.filter_by_warc_type,
&cli.filter_by_recordid,
);
}
}
}