mirror of
https://github.com/yacy/yacy_search_server.git
synced 2025-05-24 23:59:33 -04:00
Removed unncessary reflection usage for workflow tasks.
This improves code readability and maintainability (calls hierarchy are easier to read) and eventually performance.
This commit is contained in:
parent
897d3d30cc
commit
9ddf92d143
source/net/yacy
crawler
kelondro/workflow
peers
search
@ -55,13 +55,14 @@ import net.yacy.crawler.retrieval.Request;
|
||||
import net.yacy.crawler.robots.RobotsTxt;
|
||||
import net.yacy.document.TextParser;
|
||||
import net.yacy.kelondro.workflow.WorkflowProcessor;
|
||||
import net.yacy.kelondro.workflow.WorkflowTask;
|
||||
import net.yacy.peers.SeedDB;
|
||||
import net.yacy.repository.Blacklist.BlacklistType;
|
||||
import net.yacy.repository.FilterEngine;
|
||||
import net.yacy.search.Switchboard;
|
||||
import net.yacy.search.index.Segment;
|
||||
|
||||
public final class CrawlStacker {
|
||||
public final class CrawlStacker implements WorkflowTask<Request>{
|
||||
|
||||
public static String ERROR_NO_MATCH_MUST_MATCH_FILTER = "url does not match must-match filter ";
|
||||
public static String ERROR_MATCH_WITH_MUST_NOT_MATCH_FILTER = "url matches must-not-match filter ";
|
||||
@ -99,7 +100,7 @@ public final class CrawlStacker {
|
||||
this.acceptLocalURLs = acceptLocalURLs;
|
||||
this.acceptGlobalURLs = acceptGlobalURLs;
|
||||
this.domainList = domainList;
|
||||
this.requestQueue = new WorkflowProcessor<Request>("CrawlStacker", "This process checks new urls before they are enqueued into the balancer (proper, double-check, correct domain, filter)", new String[]{"Balancer"}, this, "job", 10000, null, WorkflowProcessor.availableCPU);
|
||||
this.requestQueue = new WorkflowProcessor<Request>("CrawlStacker", "This process checks new urls before they are enqueued into the balancer (proper, double-check, correct domain, filter)", new String[]{"Balancer"}, this, 10000, null, WorkflowProcessor.availableCPU);
|
||||
CrawlStacker.log.info("STACKCRAWL thread initialized.");
|
||||
}
|
||||
|
||||
@ -130,7 +131,8 @@ public final class CrawlStacker {
|
||||
clear();
|
||||
}
|
||||
|
||||
public Request job(final Request entry) {
|
||||
@Override
|
||||
public Request process(final Request entry) {
|
||||
// this is the method that is called by the busy thread from outside
|
||||
if (entry == null) return null;
|
||||
|
||||
|
@ -24,10 +24,6 @@
|
||||
|
||||
package net.yacy.kelondro.workflow;
|
||||
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.lang.reflect.Method;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import net.yacy.cora.util.ConcurrentLog;
|
||||
@ -36,43 +32,19 @@ import net.yacy.cora.util.ConcurrentLog;
|
||||
public class InstantBlockingThread<J extends WorkflowJob> extends AbstractBlockingThread<J> implements BlockingThread<J> {
|
||||
private static final String BLOCKINGTHREAD = "BLOCKINGTHREAD";
|
||||
|
||||
private final Method jobExecMethod;
|
||||
private final Object environment;
|
||||
private final Long handle;
|
||||
private final WorkflowTask<J> task;
|
||||
private static AtomicInteger handleCounter = new AtomicInteger(0);
|
||||
private static AtomicInteger instantThreadCounter = new AtomicInteger(0);
|
||||
private static final ConcurrentMap<Long, String> jobs = new ConcurrentHashMap<Long, String>();
|
||||
|
||||
public InstantBlockingThread(final WorkflowProcessor<J> manager) {
|
||||
super();
|
||||
|
||||
// jobExec is the name of a method of the object 'env' that executes the one-step-run
|
||||
// jobCount is the name of a method that returns the size of the job
|
||||
|
||||
// set the manager of blocking queues for input and output
|
||||
setManager(manager);
|
||||
|
||||
// define execution class
|
||||
final Object env = manager.getEnvironment();
|
||||
final String jobExec = manager.getMethodName();
|
||||
this.jobExecMethod = execMethod(env, jobExec);
|
||||
this.environment = (env instanceof Class<?>) ? null : env;
|
||||
setName(this.jobExecMethod.getClass().getName() + "." + this.jobExecMethod.getName() + "." + handleCounter.getAndIncrement());
|
||||
this.handle = Long.valueOf(System.currentTimeMillis() + getName().hashCode());
|
||||
}
|
||||
|
||||
protected static Method execMethod(final Object env, final String jobExec) {
|
||||
final Class<?> theClass = (env instanceof Class<?>) ? (Class<?>) env : env.getClass();
|
||||
try {
|
||||
for (final Method method: theClass.getMethods()) {
|
||||
if ((method.getParameterTypes().length == 1) && (method.getName().equals(jobExec))) {
|
||||
return method;
|
||||
}
|
||||
}
|
||||
throw new NoSuchMethodException(jobExec + " does not exist in " + env.getClass().getName());
|
||||
} catch (final NoSuchMethodException e) {
|
||||
throw new RuntimeException("serverInstantThread, wrong declaration of jobExec: " + e.getMessage());
|
||||
}
|
||||
// define task to be executed
|
||||
this.task = manager.getTask();
|
||||
setName(manager.getName() + "." + handleCounter.getAndIncrement());
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -95,21 +67,17 @@ public class InstantBlockingThread<J extends WorkflowJob> extends AbstractBlocki
|
||||
|
||||
instantThreadCounter.incrementAndGet();
|
||||
//System.out.println("started job " + this.handle + ": " + this.getName());
|
||||
jobs.put(this.handle, getName());
|
||||
|
||||
try {
|
||||
out = (J) this.jobExecMethod.invoke(this.environment, new Object[]{next});
|
||||
out = this.task.process(next);
|
||||
} catch (final Throwable e) {
|
||||
ConcurrentLog.severe(BLOCKINGTHREAD, "Internal Error in serverInstantThread.job: " + e.getMessage());
|
||||
ConcurrentLog.severe(BLOCKINGTHREAD, "shutting down thread '" + getName() + "'");
|
||||
final Throwable targetException = (e instanceof InvocationTargetException) ? ((InvocationTargetException) e).getTargetException() : null;
|
||||
ConcurrentLog.logException(e);
|
||||
ConcurrentLog.logException(e.getCause());
|
||||
if (targetException != null) ConcurrentLog.logException(targetException);
|
||||
ConcurrentLog.severe(BLOCKINGTHREAD, "Runtime Error in serverInstantThread.job, thread '" + getName() + "': " + e.getMessage());
|
||||
}
|
||||
instantThreadCounter.decrementAndGet();
|
||||
jobs.remove(this.handle);
|
||||
getManager().increaseJobTime(System.currentTimeMillis() - t);
|
||||
}
|
||||
return out;
|
||||
|
@ -47,27 +47,26 @@ public class WorkflowProcessor<J extends WorkflowJob> {
|
||||
private BlockingQueue<J> input;
|
||||
private final WorkflowProcessor<J> output;
|
||||
private final int maxpoolsize;
|
||||
private final Object environment;
|
||||
private final String processName, methodName, description;
|
||||
private final WorkflowTask<J> task;
|
||||
private final String processName, description;
|
||||
private final String[] childs;
|
||||
private long blockTime, execTime, passOnTime;
|
||||
private long execCount;
|
||||
|
||||
public WorkflowProcessor(
|
||||
final String name, final String description, final String[] childnames,
|
||||
final Object env, final String jobExecMethod,
|
||||
final WorkflowTask<J> task,
|
||||
final int inputQueueSize, final WorkflowProcessor<J> output,
|
||||
final int maxpoolsize) {
|
||||
// start a fixed number of executors that handle entries in the process queue
|
||||
this.environment = env;
|
||||
this.processName = name;
|
||||
this.description = description;
|
||||
this.methodName = jobExecMethod;
|
||||
this.task = task;
|
||||
this.childs = childnames;
|
||||
this.maxpoolsize = maxpoolsize;
|
||||
this.input = new LinkedBlockingQueue<J>(Math.max(maxpoolsize + 1, inputQueueSize));
|
||||
this.output = output;
|
||||
this.executor = Executors.newCachedThreadPool(new NamePrefixThreadFactory(this.methodName));
|
||||
this.executor = Executors.newCachedThreadPool(new NamePrefixThreadFactory(name));
|
||||
this.executorRunning = new AtomicInteger(0);
|
||||
/*
|
||||
for (int i = 0; i < this.maxpoolsize; i++) {
|
||||
@ -85,13 +84,9 @@ public class WorkflowProcessor<J extends WorkflowJob> {
|
||||
processMonitor.add(this);
|
||||
}
|
||||
|
||||
public Object getEnvironment() {
|
||||
return this.environment;
|
||||
}
|
||||
|
||||
public String getMethodName() {
|
||||
return this.methodName;
|
||||
}
|
||||
public WorkflowTask<J> getTask() {
|
||||
return this.task;
|
||||
}
|
||||
|
||||
public int getQueueSize() {
|
||||
if (this.input == null) return 0;
|
||||
@ -169,14 +164,13 @@ public class WorkflowProcessor<J extends WorkflowJob> {
|
||||
this.input = i;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public void enQueue(final J in) {
|
||||
// ensure that enough job executors are running
|
||||
if (this.input == null || this.executor == null || this.executor.isShutdown() || this.executor.isTerminated()) {
|
||||
// execute serialized without extra thread
|
||||
//Log.logWarning("PROCESSOR", "executing job " + environment.getClass().getName() + "." + methodName + " serialized");
|
||||
try {
|
||||
final J out = (J) InstantBlockingThread.execMethod(this.environment, this.methodName).invoke(this.environment, new Object[]{in});
|
||||
final J out = this.task.process(in);
|
||||
if (out != null && this.output != null) {
|
||||
this.output.enQueue(out);
|
||||
}
|
||||
|
47
source/net/yacy/kelondro/workflow/WorkflowTask.java
Normal file
47
source/net/yacy/kelondro/workflow/WorkflowTask.java
Normal file
@ -0,0 +1,47 @@
|
||||
// WorkflowTask.java
|
||||
// ---------------------------
|
||||
// Copyright 2018 by luccioman; https://github.com/luccioman
|
||||
//
|
||||
// This is a part of YaCy, a peer-to-peer based web search engine
|
||||
//
|
||||
// LICENSE
|
||||
//
|
||||
// This program is free software; you can redistribute it and/or modify
|
||||
// it under the terms of the GNU General Public License as published by
|
||||
// the Free Software Foundation; either version 2 of the License, or
|
||||
// (at your option) any later version.
|
||||
//
|
||||
// This program is distributed in the hope that it will be useful,
|
||||
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
// GNU General Public License for more details.
|
||||
//
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with this program; if not, write to the Free Software
|
||||
// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
|
||||
|
||||
package net.yacy.kelondro.workflow;
|
||||
|
||||
/**
|
||||
* A workflow task to be processed by a {@link WorkflowProcessor}.
|
||||
*
|
||||
* @author luccioman
|
||||
* @param <ENTRY>
|
||||
* the workflow entry type to be processed
|
||||
*
|
||||
*/
|
||||
public interface WorkflowTask<ENTRY> {
|
||||
|
||||
/**
|
||||
* Process a single workflow entry and eventually return the entry to be
|
||||
* processed by the next processor in the workflow
|
||||
*
|
||||
* @param in
|
||||
* the workflow entry
|
||||
* @return an entry for the next processor or null
|
||||
* @throws Exception
|
||||
* when an error occurred
|
||||
*/
|
||||
ENTRY process(final ENTRY in) throws Exception;
|
||||
|
||||
}
|
@ -44,11 +44,13 @@ import net.yacy.kelondro.data.word.WordReference;
|
||||
import net.yacy.kelondro.index.RowHandleSet;
|
||||
import net.yacy.kelondro.rwi.ReferenceContainer;
|
||||
import net.yacy.kelondro.workflow.WorkflowProcessor;
|
||||
import net.yacy.kelondro.workflow.WorkflowTask;
|
||||
import net.yacy.peers.Transmission.Chunk;
|
||||
import net.yacy.search.Switchboard;
|
||||
import net.yacy.search.SwitchboardConstants;
|
||||
import net.yacy.search.index.Segment;
|
||||
|
||||
public class Dispatcher {
|
||||
public class Dispatcher implements WorkflowTask<Transmission.Chunk> {
|
||||
|
||||
/**
|
||||
* the dispatcher class accumulates indexContainerCache objects before they are transfered
|
||||
@ -123,7 +125,7 @@ public class Dispatcher {
|
||||
"transferDocumentIndex",
|
||||
"This is the RWI transmission process",
|
||||
new String[]{"RWI/Cache/Collections"},
|
||||
this, "transferDocumentIndex", concurrentSender * 3, null, concurrentSender);
|
||||
this, concurrentSender * 3, null, concurrentSender);
|
||||
}
|
||||
|
||||
public int bufferSize() {
|
||||
@ -350,15 +352,16 @@ public class Dispatcher {
|
||||
this.indexingTransmissionProcessor.enQueue(chunk);
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Chunk process(final Transmission.Chunk chunk) throws Exception {
|
||||
return transferDocumentIndex(chunk);
|
||||
}
|
||||
|
||||
/**
|
||||
* transfer job: this method is called using reflection from the switchboard
|
||||
* the method is called as a Workflow process. That means it is always called whenever
|
||||
* a job is placed in the workflow queue. This happens in dequeueContainer()
|
||||
* @param chunk
|
||||
* @return
|
||||
* Transfer job implementation
|
||||
*/
|
||||
public Transmission.Chunk transferDocumentIndex(final Transmission.Chunk chunk) {
|
||||
private Transmission.Chunk transferDocumentIndex(final Transmission.Chunk chunk) {
|
||||
|
||||
// try to keep the system healthy; sleep as long as System load is too high
|
||||
while (Protocol.metadataRetrievalRunning.get() > 0) try {Thread.sleep(1000);} catch (InterruptedException e) {break;}
|
||||
|
@ -195,6 +195,7 @@ import net.yacy.kelondro.workflow.BusyThread;
|
||||
import net.yacy.kelondro.workflow.InstantBusyThread;
|
||||
import net.yacy.kelondro.workflow.OneTimeBusyThread;
|
||||
import net.yacy.kelondro.workflow.WorkflowProcessor;
|
||||
import net.yacy.kelondro.workflow.WorkflowTask;
|
||||
import net.yacy.kelondro.workflow.WorkflowThread;
|
||||
import net.yacy.peers.DHTSelection;
|
||||
import net.yacy.peers.Dispatcher;
|
||||
@ -1006,8 +1007,14 @@ public final class Switchboard extends serverSwitch {
|
||||
new String[] {
|
||||
"RWI/Cache/Collections"
|
||||
},
|
||||
this,
|
||||
"storeDocumentIndex",
|
||||
new WorkflowTask<IndexingQueueEntry>() {
|
||||
|
||||
@Override
|
||||
public IndexingQueueEntry process(final IndexingQueueEntry in) throws Exception {
|
||||
storeDocumentIndex(in);
|
||||
return null;
|
||||
}
|
||||
},
|
||||
2,
|
||||
null,
|
||||
1);
|
||||
@ -1018,8 +1025,13 @@ public final class Switchboard extends serverSwitch {
|
||||
new String[] {
|
||||
"storeDocumentIndex"
|
||||
},
|
||||
this,
|
||||
"webStructureAnalysis",
|
||||
new WorkflowTask<IndexingQueueEntry>() {
|
||||
|
||||
@Override
|
||||
public IndexingQueueEntry process(final IndexingQueueEntry in) throws Exception {
|
||||
return webStructureAnalysis(in);
|
||||
}
|
||||
},
|
||||
WorkflowProcessor.availableCPU + 1,
|
||||
this.indexingStorageProcessor,
|
||||
WorkflowProcessor.availableCPU);
|
||||
@ -1030,8 +1042,13 @@ public final class Switchboard extends serverSwitch {
|
||||
new String[] {
|
||||
"webStructureAnalysis"
|
||||
},
|
||||
this,
|
||||
"condenseDocument",
|
||||
new WorkflowTask<IndexingQueueEntry>() {
|
||||
|
||||
@Override
|
||||
public IndexingQueueEntry process(final IndexingQueueEntry in) throws Exception {
|
||||
return condenseDocument(in);
|
||||
}
|
||||
},
|
||||
WorkflowProcessor.availableCPU + 1,
|
||||
this.indexingAnalysisProcessor,
|
||||
WorkflowProcessor.availableCPU);
|
||||
@ -1042,8 +1059,13 @@ public final class Switchboard extends serverSwitch {
|
||||
new String[] {
|
||||
"condenseDocument", "CrawlStacker"
|
||||
},
|
||||
this,
|
||||
"parseDocument",
|
||||
new WorkflowTask<IndexingQueueEntry>() {
|
||||
|
||||
@Override
|
||||
public IndexingQueueEntry process(final IndexingQueueEntry in) throws Exception {
|
||||
return parseDocument(in);
|
||||
}
|
||||
},
|
||||
Math.max(20, WorkflowProcessor.availableCPU * 2), // it may happen that this is filled with new files from the search process. That means there should be enough place for two result pages
|
||||
this.indexingCondensementProcessor,
|
||||
WorkflowProcessor.availableCPU);
|
||||
@ -2890,8 +2912,6 @@ public final class Switchboard extends serverSwitch {
|
||||
|
||||
/**
|
||||
* Parse a response to produce a new document to add to the index.
|
||||
* <strong>Important :</strong> this method is called using reflection as a Workflow process and must therefore remain public.
|
||||
* @param in an indexing workflow entry containing a response to parse
|
||||
*/
|
||||
public IndexingQueueEntry parseDocument(final IndexingQueueEntry in) {
|
||||
in.queueEntry.updateStatus(Response.QUEUE_STATE_PARSING);
|
||||
@ -3071,10 +3091,11 @@ public final class Switchboard extends serverSwitch {
|
||||
return documents;
|
||||
}
|
||||
|
||||
/**
|
||||
* <strong>Important :</strong> this method is called using reflection as a Workflow process and must therefore remain public.
|
||||
* @param in an indexing workflow entry containing a response and the related parsed document(s)
|
||||
*/
|
||||
/**
|
||||
* This does a structural analysis of plain texts: markup of headlines, slicing
|
||||
* into phrases (i.e. sentences), markup with position, counting of words,
|
||||
* calculation of term frequency.
|
||||
*/
|
||||
public IndexingQueueEntry condenseDocument(final IndexingQueueEntry in) {
|
||||
in.queueEntry.updateStatus(Response.QUEUE_STATE_CONDENSING);
|
||||
CrawlProfile profile = in.queueEntry.profile();
|
||||
@ -3150,9 +3171,7 @@ public final class Switchboard extends serverSwitch {
|
||||
}
|
||||
|
||||
/**
|
||||
* Perform web structure analysis on parsed documents and update the web structure graph.
|
||||
* <strong>Important :</strong> this method is called using reflection as a Workflow process and must therefore remain public.
|
||||
* @param in an indexing workflow entry containing parsed document(s)
|
||||
* Perform web structure analysis on parsed documents and update the web structure graph.
|
||||
*/
|
||||
public IndexingQueueEntry webStructureAnalysis(final IndexingQueueEntry in) {
|
||||
in.queueEntry.updateStatus(Response.QUEUE_STATE_STRUCTUREANALYSIS);
|
||||
@ -3166,11 +3185,9 @@ public final class Switchboard extends serverSwitch {
|
||||
}
|
||||
return in;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Store a new entry to the local index.
|
||||
* <strong>Important :</strong> this method is called using reflection as a Workflow process and must therefore remain public.
|
||||
* @param in an indexing workflow entry containing parsed document(s) and a condenser instance
|
||||
*/
|
||||
public void storeDocumentIndex(final IndexingQueueEntry in) {
|
||||
in.queueEntry.updateStatus(Response.QUEUE_STATE_INDEXSTORAGE);
|
||||
|
Loading…
x
Reference in New Issue
Block a user