package groovyx.gpars.dataflow.operator;

import groovy.lang.Closure;
import groovyx.gpars.actor.StaticDispatchActor;
import groovyx.gpars.actor.impl.MessageStream;
import groovyx.gpars.dataflow.DataflowChannelListener;
import groovyx.gpars.dataflow.DataflowReadChannel;
import groovyx.gpars.group.PGroup;
import java.util.Iterator;
import java.util.List;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:BOOT-INF/lib/gpars-1.2.1.jar:groovyx/gpars/dataflow/operator/DataflowProcessorActor.class */
public abstract class DataflowProcessorActor extends StaticDispatchActor<Object> {
    protected static final String CANNOT_OBTAIN_THE_SEMAPHORE_TO_FORK_OPERATOR_S_BODY = "Cannot obtain the semaphore to fork operator's body.";
    protected final List inputs;
    protected final List outputs;
    protected final Closure code;
    protected final DataflowProcessor owningProcessor;
    protected boolean stoppingGently = false;

    /* JADX INFO: Access modifiers changed from: package-private */
    public DataflowProcessorActor(DataflowProcessor dataflowProcessor, PGroup pGroup, List list, List list2, Closure closure) {
        setParallelGroup(pGroup);
        this.owningProcessor = dataflowProcessor;
        this.outputs = list;
        this.inputs = list2;
        this.code = closure;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void afterStart() {
        this.owningProcessor.fireAfterStart();
    }

    void afterStop() {
        this.owningProcessor.fireAfterStop();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public final void registerChannelListenersToAllInputs(DataflowChannelListener<Object> dataflowChannelListener) {
        Iterator it = this.inputs.iterator();
        while (it.hasNext()) {
            ((DataflowReadChannel) it.next()).getEventManager().addDataflowChannelListener(dataflowChannelListener);
        }
    }

    final void onException(Throwable th) {
        reportException(th);
        terminate();
    }

    @Override // groovyx.gpars.actor.AbstractLoopingActor, groovyx.gpars.actor.impl.MessageStream
    public MessageStream send(Object obj) {
        try {
            super.send(obj);
        } catch (IllegalStateException e) {
            if (!hasBeenStopped()) {
                throw e;
            }
        }
        return this;
    }

    @Override // groovyx.gpars.actor.StaticDispatchActor
    public void onMessage(Object obj) {
        throw new IllegalStateException("The dataflow actor doesn't recognize the message $message");
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static boolean isControlMessage(Object obj) {
        return obj instanceof ControlMessage;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public final void checkPoison(Object obj) {
        if (obj instanceof PoisonPill) {
            forwardPoisonPill(obj);
            this.owningProcessor.terminate();
            ((PoisonPill) obj).countDown();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void forwardPoisonPill(Object obj) {
        this.owningProcessor.bindAllOutputsAtomically(obj);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public final void reportException(Throwable th) {
        this.owningProcessor.reportError(th);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Object fireMessageArrived(Object obj, int i, boolean z) {
        return z ? this.owningProcessor.fireControlMessageArrived((DataflowReadChannel) this.inputs.get(i), i, obj) : this.owningProcessor.fireMessageArrived((DataflowReadChannel) this.inputs.get(i), i, obj);
    }
}
