Subversion Repositories aduna

[/] [org.openrdf/] [alibaba/] [trunk/] [optimistic-sail/] [src/] [main/] [java/] [org/] [openrdf/] [sail/] [optimistic/] [OptimisticSail.java] - Rev 8555

Go to most recent revision | Compare with Previous | Blame | View Log

package org.openrdf.sail.optimistic;

import info.aduna.concurrent.locks.Lock;
import info.aduna.concurrent.locks.ReadWriteLockManager;
import info.aduna.concurrent.locks.WritePrefReadWriteLockManager;

import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

import org.openrdf.cursor.Cursor;
import org.openrdf.model.Model;
import org.openrdf.model.ValueFactory;
import org.openrdf.query.BindingSet;
import org.openrdf.query.algebra.QueryModel;
import org.openrdf.query.algebra.evaluation.QueryBindingSet;
import org.openrdf.sail.NotifyingSail;
import org.openrdf.sail.NotifyingSailConnection;
import org.openrdf.sail.Sail;
import org.openrdf.sail.SailChangedListener;
import org.openrdf.sail.SailConnection;
import org.openrdf.sail.helpers.AutoBeginNotifyingConnection;
import org.openrdf.sail.helpers.SailWrapper;
import org.openrdf.sail.inferencer.InferencerConnection;
import org.openrdf.sail.inferencer.helpers.AutoBeginInferencerConnection;
import org.openrdf.sail.optimistic.exceptions.ConcurrencyException;
import org.openrdf.sail.optimistic.helpers.DeltaMerger;
import org.openrdf.sail.optimistic.helpers.EvaluateOperation;
import org.openrdf.store.StoreException;

public class OptimisticSail extends SailWrapper implements NotifyingSail {

    private static final String DELTA_VARNAME = "-delta-";
    private ReadWriteLockManager preparing = new WritePrefReadWriteLockManager();
    private ReadWriteLockManager locker = new WritePrefReadWriteLockManager();
    private Map<OptimisticConnection, Lock> transactions = new HashMap<OptimisticConnection, Lock>();
    private volatile Lock preparedLock;
    private volatile OptimisticConnection prepared;
    private volatile boolean listenersIsEmpty = true;
    private Set<SailChangedListener> listeners = new HashSet<SailChangedListener>();

    public OptimisticSail() {
        super();
    }

    public OptimisticSail(Sail baseSail) {
        super(baseSail);
    }

    public void addSailChangedListener(SailChangedListener listener) {
        synchronized (listeners) {
            listeners.add(listener);
            listenersIsEmpty = false;
        }
    }

    public void removeSailChangedListener(SailChangedListener listener) {
        synchronized (listeners) {
            listeners.remove(listener);
        }
    }

    @Override
    public NotifyingSailConnection getConnection() throws StoreException {
        SailConnection con = super.getConnection();
        if (con instanceof InferencerConnection) {
            return new AutoBeginInferencerConnection(
                    new OptimisticInferencerConnection(this,
                            (InferencerConnection) con));
        } else {
            return new AutoBeginNotifyingConnection(new OptimisticConnection(
                    this, con));
        }
    }

    Set<SailChangedListener> getListeners() {
        if (listenersIsEmpty)
            return Collections.emptySet();
        synchronized (listeners) {
            return new HashSet<SailChangedListener>(listeners);
        }
    }

    synchronized void begin(OptimisticConnection con)
            throws InterruptedException {
        transactions.put(con, locker.getReadLock());
    }

    public Lock getReadLock() throws StoreException {
        try {
            return preparing.getReadLock();
        } catch (InterruptedException e) {
            throw new StoreException(e);
        }
    }

    synchronized void prepare(OptimisticConnection prepared)
            throws InterruptedException, StoreException {
        assert transactions.containsKey(prepared);
        preparedLock = preparing.getWriteLock();
        this.prepared = prepared;
        synchronized (prepared) {
            Model added = prepared.getAddedModel();
            Model removed = prepared.getRemovedModel();
            SailConnection sail = super.getConnection();
            try {
                for (OptimisticConnection con : transactions.keySet()) {
                    if (con == prepared)
                        continue;
                    synchronized (con) {
                        for (EvaluateOperation op : con.getReadOperations()) {
                            if (!added.isEmpty() && effects(added, op, sail)) {
                                con.setConclict(new ConcurrencyException());
                                break;
                            }
                            if (!removed.isEmpty()
                                    && effects(removed, op, sail)) {
                                con.setConclict(new ConcurrencyException());
                                break;
                            }
                        }
                    }
                }
            } finally {
                sail.close();
            }
        }
    }

    synchronized void end(OptimisticConnection con) {
        Lock lock = transactions.get(con);
        try {
            transactions.remove(con);
            if (prepared == con) {
                preparedLock.release();
            }
            con.setConclict(null);
        } finally {
            lock.release();
        }
    }

    synchronized boolean exclusive(OptimisticConnection con) {
        if (transactions.size() == 1 && transactions.containsKey(con)) {
            end(con);
            try {
                transactions.put(con, locker.getWriteLock());
            } catch (InterruptedException e) {
                throw new AssertionError(e);
            }
            return true;
        }
        return false;
    }

    /**
     * Modify the query to include the delta with an extra binding and see if
     * the extra binding comes out the other side.
     */
    private boolean effects(Model delta, EvaluateOperation op,
            SailConnection sail) throws StoreException {
        QueryModel query = op.getQueryModel().clone();
        BindingSet bindings = op.getBindingSet();
        boolean inf = op.isIncludeInferred();

        ValueFactory vf = sail.getValueFactory();
        QueryBindingSet additional = new QueryBindingSet();
        additional.addBinding(DELTA_VARNAME, vf.createLiteral(true));
        DeltaMerger merger = new DeltaMerger(delta, query, additional);

        merger.optimize(query, bindings);
        if (!merger.isModified())
            return false;

        Cursor<? extends BindingSet> result;
        result = sail.evaluate(query, bindings, inf);
        try {
            BindingSet bs;
            while ((bs = result.next()) != null) {
                if (bs.hasBinding(DELTA_VARNAME))
                    return true;
            }
            return false;
        } finally {
            result.close();
        }
    }
}

Go to most recent revision | Compare with Previous | Blame