modules/rdf/store.zzm

rdf-0.0.3 source code

=encoding utf8

=head1 NAME

rdf/store - std/db-backed RDF quad store.

=head1 SYNOPSIS

  from rdf/store import RDFStore;
  from rdf/term import rdf_iri, rdf_literal, rdf_quad;
  
  let store := RDFStore.temp();
  store.install_schema();
  store.add_quad(rdf_quad(
      rdf_iri("http://example.com/s"),
      rdf_iri("http://example.com/p"),
      rdf_literal("value"),
  ));
  
  let quads := store.find(rdf_iri("http://example.com/s"));


=head1 DESCRIPTION

C<RDFStore> stores RDF quads in relational tables managed through
C<std/db>. Terms are interned, quads are indexed for lookup, and all graph
operations use RDF term identity rather than object identity. The schema
must be installed before use.

=head1 EXPORTS

=head2 Classes

=over

=item C<RDFStore>

Construct with C<new RDFStore(dbh: dbh)> for SQLite or with an explicit
C<backend> of C<sqlite>, C<mysql>, or C<postgresql>. C<prefix> controls
the table name prefix and defaults to C<rdf_>.

=over

=item C<< temp() >>

Returns C<new RDFStore(dbh: DB.temp())>.

=item C<< install_schema() >>

Creates the store tables and indexes if needed.

=item C<< schema_version() >>

Returns the installed schema version, or C<0> if the schema is absent.

=item C<< verify_schema(Boolean strict := true) >>

Checks that required schema objects exist. Throws on failure when
C<strict> is true; otherwise returns false.

=item C<< drop_schema() >>

Drops the store tables for this prefix.

=item C<< begin() >>, C<< commit() >>, C<< rollback() >>

Transaction helpers delegated to the database handle.

=item C<< with_transaction(Function todo) >>

Runs C<todo> inside a transaction, committing on success and rolling back
on exception.

=item C<< add_term(term) >>

Interns an RDF term and returns its database id.

=item C<< term_for_key(String key) >>

Returns the RDF term previously interned for C<key>, or C<null>.

=item C<< add_quad(RDFQuad quad) >>, C<< add_quads(Array quads) >>

Adds one or more quads. Duplicate quads are ignored by the store schema.

=item C<< add_quads_bulk(Array quads) >>

Adds many quads inside a transaction.

=item C<< remove_quad(RDFQuad quad) >>, C<< remove_quads(Array quads) >>

Removes one or more exact quads.

=item C<< remove_match(subject := null, predicate := null, object := null, graph := null) >>

Removes quads matching the supplied RDF term pattern. C<null> is a
wildcard.

=item C<< clear() >>

Removes every quad.

=item C<< clear_graph(graph := null) >>

Removes quads in the supplied graph, or the default graph when omitted.

=item C<< graph_names() >>

Returns named graph terms present in the store.

=item C<< graph_exists(graph := null) >>

Returns whether the supplied graph has any quads.

=item C<< copy_graph(source := null, target := null) >>

Copies all quads from C<source> to C<target>.

=item C<< add_graph(source := null, target := null) >>

Adds quads from C<source> into C<target> without clearing C<target>.

=item C<< move_graph(source := null, target := null) >>

Copies then clears C<source>.

=item C<< replace_graph(graph := null, Array quads := []) >>

Clears C<graph> and inserts the supplied quads into it.

=item C<< merge_store(RDFStore other) >>

Adds all quads from another store.

=item C<< count_quads() >>, C<< count_terms() >>

Return store counts.

=item C<< statistics() >>

Returns a dictionary of store counts and graph information.

=item C<< diagnostics() >>

Returns schema and backend diagnostic information.

=item C<< explain_find(subject := null, predicate := null, object := null, graph := null) >>

Returns a description of the lookup path selected for a pattern.

=item C<< explicit_find(subject := null, predicate := null, object := null, graph := null) >>

Returns matching stored quads without entailment.

=item C<< find(subject := null, predicate := null, object := null, graph := null) >>

Returns matching quads. Subclasses may add inferred quads.

=item C<< serialize(serializer := null) >>

Serializes all quads in the store. Defaults to C<NQuadsSerializer> when
no serializer object is supplied.

=item C<< serialize_to(Path path, serializer := null) >>

Serializes all quads in the store to C<path>. Defaults to
C<NQuadsSerializer> when no serializer object is supplied.

=back

=back

=head1 COPYRIGHT AND LICENCE

B<< rdf/store >> is copyright Toby Inkster.

It is free software; you may redistribute it and/or modify it under
the terms of either the Artistic License 1.0 or the GNU General Public
License version 2.

=cut

from rdf/term import
	RDFBlank,
	RDFDefaultGraph,
	RDFIRI,
	RDFLiteral,
	RDFQuad,
	rdf_blank,
	rdf_default_graph,
	rdf_iri,
	rdf_literal,
	rdf_quad,
	rdf_term_hash,
	rdf_term_kind;
from rdf/graph import rdf_quad_key;
from rdf/serializer/nquads import NQuadsSerializer;
from std/db import DB;
from std/io import Path;
from std/string import join;

function _ident ( String raw ) {
	die "rdf: unsafe SQL identifier prefix" unless raw ~ /^[A-Za-z][A-Za-z0-9_]*$/;
	return raw;
}

function _backend ( String raw ) {
	let b := lc(raw);
	return b if b eq "sqlite" or b eq "mysql" or b eq "postgresql";
	die "rdf: unsupported backend '" _ raw _ "'";
}

class RDFStore {
	let dbh with get := null;
	let String backend with get := "sqlite";
	let String prefix with get := "rdf_";

	static method temp () {
		// sqlite_unicode keeps text round-trips lossless on zuzu.pl;
		// the other runtimes ignore the setting.
		return new RDFStore(dbh: DB.temp({ sqlite_unicode: true }));
	}

	method __build__ () {
		dbh := DB.temp({ sqlite_unicode: true }) if dbh == null;
		backend := _backend(backend);
		prefix := _ident(prefix);
	}

	method _table ( String name ) {
		return prefix _ name;
	}

	method install_schema () {
		let first_install := not self.verify_schema(false);
		dbh.prepare(
			"create table if not exists " _ self._table("meta") _
			" (k varchar(64) primary key, v varchar(255))",
		).execute();
		dbh.prepare(
			"create table if not exists " _ self._table("terms") _
			" (term_key varchar(128) primary key, kind varchar(16) not null, " _
			"value text, lang varchar(64), datatype text)",
		).execute();
		dbh.prepare(
			"create table if not exists " _ self._table("quads") _
			" (s varchar(128) not null, p varchar(128) not null, " _
			"o varchar(128) not null, g varchar(128) not null, " _
			"primary key (s, p, o, g))",
		).execute();
		self._install_indexes() if first_install;
		self._insert_meta( "schema_version", "1" );
		return self;
	}

	method _install_indexes () {
		let quads := self._table("quads");
		dbh.prepare(
			"create index " _ prefix _ "quads_pog on " _ quads _
			" (p, o, g)",
		).execute();
		dbh.prepare(
			"create index " _ prefix _ "quads_sp on " _ quads _
			" (s, p)",
		).execute();
		dbh.prepare(
			"create index " _ prefix _ "quads_gsp on " _ quads _
			" (g, s, p)",
		).execute();
		return self;
	}

	method schema_version () {
		let q := dbh.prepare(
			"select v from " _ self._table("meta") _ " where k = ?",
		);
		q.execute("schema_version");
		let row := q.next_array();
		return row == null ? "" : row[0];
	}

	method verify_schema ( Boolean strict := true ) {
		let ok := false;
		try {
			ok := self.schema_version() eq "1";
		}
		catch {
			die "rdf: schema is not installed" if strict;
			return false;
		}
		if ( not ok and strict ) {
			die "rdf: unsupported or missing RDF store schema";
		}
		return ok;
	}

	method _insert_meta ( String k, String v ) {
		let q := dbh.prepare(
			"select v from " _ self._table("meta") _ " where k = ?",
		);
		q.execute(k);
		if ( not q.next_array() ) {
			dbh.prepare(
				"insert into " _ self._table("meta") _ " (k, v) values (?, ?)",
			).execute( k, v );
		}
		return self;
	}

	method drop_schema () {
		dbh.prepare( "drop table if exists " _ self._table("quads") ).execute();
		dbh.prepare( "drop table if exists " _ self._table("terms") ).execute();
		dbh.prepare( "drop table if exists " _ self._table("meta") ).execute();
		return self;
	}

	method begin () {
		dbh.begin();
		return self;
	}

	method commit () {
		dbh.commit();
		return self;
	}

	method rollback () {
		dbh.rollback();
		return self;
	}

	method with_transaction ( Function todo ) {
		self.begin();
		try {
			let result := todo(self);
			self.commit();
			return result;
		}
		catch ( Exception e ) {
			self.rollback();
			throw e;
		}
	}

	method add_term ( term ) {
		let key := rdf_term_hash(term);
		let q := dbh.prepare(
			"select term_key from " _ self._table("terms") _ " where term_key = ?",
		);
		q.execute(key);
		return key if q.next_array();

		let kind := rdf_term_kind(term);
		let value := "";
		let lang := "";
		let datatype := "";
		if ( term instanceof RDFIRI or term instanceof RDFBlank ) {
			value := term.get_value();
		}
		else if ( term instanceof RDFLiteral ) {
			value := term.get_value();
			lang := term.get_lang();
			datatype := term.get_datatype().get_value();
		}
		dbh.prepare(
			"insert into " _ self._table("terms") _
			" (term_key, kind, value, lang, datatype) values (?, ?, ?, ?, ?)",
		).execute( key, kind, value, lang, datatype );
		return key;
	}

	method term_for_key ( String key ) {
		if ( key eq rdf_term_hash(rdf_default_graph()) ) {
			return rdf_default_graph();
		}
		let q := dbh.prepare(
			"select kind, value, lang, datatype from " _ self._table("terms") _
			" where term_key = ?",
		);
		q.execute(key);
		let row := q.next_dict();
		die "rdf: unknown term key" if row == null;
		return rdf_iri(row{value}) if row{kind} eq "iri";
		return rdf_blank(row{value}) if row{kind} eq "blank";
		return rdf_default_graph() if row{kind} eq "default";
		return rdf_literal(
			row{value},
			row{lang},
			rdf_iri(row{datatype}),
		);
	}

	method add_quad ( RDFQuad quad ) {
		let s := self.add_term(quad.get_subject());
		let p := self.add_term(quad.get_predicate());
		let o := self.add_term(quad.get_object());
		let g := self.add_term(quad.get_graph());
		let exists := dbh.prepare(
			"select s from " _ self._table("quads") _
			" where s = ? and p = ? and o = ? and g = ?",
		);
		exists.execute( s, p, o, g );
		if ( not exists.next_array() ) {
			dbh.prepare(
				"insert into " _ self._table("quads") _
				" (s, p, o, g) values (?, ?, ?, ?)",
			).execute( s, p, o, g );
		}
		return quad;
	}

	method add_quads ( Array quads ) {
		for ( let quad in quads ) {
			self.add_quad(quad);
		}
		return self;
	}

	method add_quads_bulk ( Array quads ) {
		return self.with_transaction(function ( store ) {
			store.add_quads(quads);
			return store;
		});
	}

	method remove_quad ( RDFQuad quad ) {
		dbh.prepare(
			"delete from " _ self._table("quads") _
			" where s = ? and p = ? and o = ? and g = ?",
		).execute(
			rdf_term_hash(quad.get_subject()),
			rdf_term_hash(quad.get_predicate()),
			rdf_term_hash(quad.get_object()),
			rdf_term_hash(quad.get_graph()),
		);
		return self;
	}

	method remove_quads ( Array quads ) {
		for ( let quad in quads ) {
			self.remove_quad(quad);
		}
		return self;
	}

	method remove_match ( subject := null, predicate := null,
		object := null, graph := null ) {
		return self.remove_quads(self.find( subject, predicate, object, graph ));
	}

	method clear () {
		dbh.prepare( "delete from " _ self._table("quads") ).execute();
		return self;
	}

	method clear_graph ( graph := null ) {
		let g := graph == null ? rdf_default_graph() : graph;
		dbh.prepare(
			"delete from " _ self._table("quads") _ " where g = ?",
		).execute(rdf_term_hash(g));
		return self;
	}

	method graph_names () {
		let q := dbh.prepare(
			"select distinct g from " _ self._table("quads") _
			" where g <> ? order by g",
		);
		q.execute(rdf_term_hash(rdf_default_graph()));
		let out := [];
		for ( let row in q.all_array() ) {
			out.push(self.term_for_key(row[0]));
		}
		return out;
	}

	method graph_exists ( graph := null ) {
		let g := graph == null ? rdf_default_graph() : graph;
		let q := dbh.prepare(
			"select g from " _ self._table("quads") _
			" where g = ? limit 1",
		);
		q.execute(rdf_term_hash(g));
		return not (q.next_array() == null);
	}

	method copy_graph ( source := null, target := null ) {
		let from_graph := source == null ? rdf_default_graph() : source;
		let to_graph := target == null ? rdf_default_graph() : target;
		let source_quads := self.find( null, null, null, from_graph );
		self.clear_graph(to_graph);
		for ( let quad in source_quads ) {
			self.add_quad(rdf_quad(
				quad.get_subject(),
				quad.get_predicate(),
				quad.get_object(),
				to_graph,
			));
		}
		return self;
	}

	method add_graph ( source := null, target := null ) {
		let from_graph := source == null ? rdf_default_graph() : source;
		let to_graph := target == null ? rdf_default_graph() : target;
		for ( let quad in self.find( null, null, null, from_graph ) ) {
			self.add_quad(rdf_quad(
				quad.get_subject(),
				quad.get_predicate(),
				quad.get_object(),
				to_graph,
			));
		}
		return self;
	}

	method move_graph ( source := null, target := null ) {
		let from_graph := source == null ? rdf_default_graph() : source;
		let to_graph := target == null ? rdf_default_graph() : target;
		self.copy_graph( from_graph, to_graph );
		self.clear_graph(from_graph)
			unless rdf_term_hash(from_graph) eq rdf_term_hash(to_graph);
		return self;
	}

	method replace_graph ( graph := null, Array quads := [] ) {
		let g := graph == null ? rdf_default_graph() : graph;
		self.clear_graph(g);
		for ( let quad in quads ) {
			self.add_quad(rdf_quad(
				quad.get_subject(),
				quad.get_predicate(),
				quad.get_object(),
				g,
			));
		}
		return self;
	}

	method merge_store ( RDFStore other ) {
		return self.add_quads_bulk(other.find());
	}

	method count_quads () {
		let q := dbh.prepare(
			"select count(*) from " _ self._table("quads"),
		);
		q.execute();
		return 0 + q.next_array()[0];
	}

	method count_terms () {
		let q := dbh.prepare(
			"select count(*) from " _ self._table("terms"),
		);
		q.execute();
		return 0 + q.next_array()[0];
	}

	method statistics () {
		return {
			backend: backend,
			prefix: prefix,
			schema_version: self.schema_version(),
			quad_count: self.count_quads(),
			term_count: self.count_terms(),
			graph_count: self.graph_names().length(),
		};
	}

	method diagnostics () {
		let stats := self.statistics();
		stats{schema_ok} := self.verify_schema(false);
		stats{indexes} := [
			prefix _ "quads_pog",
			prefix _ "quads_sp",
			prefix _ "quads_gsp",
		];
		return stats;
	}

	method explain_find ( subject := null, predicate := null,
		object := null, graph := null ) {
		let selected_index := "primary";
		if ( not (graph == null) and not (subject == null) and not (predicate == null) ) {
			selected_index := prefix _ "quads_gsp";
		}
		else if ( not (predicate == null) and not (object == null) ) {
			selected_index := prefix _ "quads_pog";
		}
		else if ( not (subject == null) and not (predicate == null) ) {
			selected_index := prefix _ "quads_sp";
		}
		return {
			backend: backend,
			index: selected_index,
			subject_bound: not (subject == null),
			predicate_bound: not (predicate == null),
			object_bound: not (object == null),
			graph_bound: not (graph == null),
		};
	}

	method explicit_find ( subject := null, predicate := null, object := null,
	graph := null ) {
		let clauses := [];
		let binds := [];
		if ( not (subject == null) ) {
			clauses.push("s = ?");
			binds.push(rdf_term_hash(subject));
		}
		if ( not (predicate == null) ) {
			clauses.push("p = ?");
			binds.push(rdf_term_hash(predicate));
		}
		if ( not (object == null) ) {
			clauses.push("o = ?");
			binds.push(rdf_term_hash(object));
		}
		if ( not (graph == null) ) {
			clauses.push("g = ?");
			binds.push(rdf_term_hash(graph));
		}

		let sql := "select s, p, o, g from " _ self._table("quads");
		sql _= " where " _ join( " and ", clauses ) if clauses.length() > 0;
		let q := dbh.prepare(sql);
		if ( binds.length() == 0 ) {
			q.execute();
		}
		else if ( binds.length() == 1 ) {
			q.execute(binds[0]);
		}
		else if ( binds.length() == 2 ) {
			q.execute( binds[0], binds[1] );
		}
		else if ( binds.length() == 3 ) {
			q.execute( binds[0], binds[1], binds[2] );
		}
		else {
			q.execute( binds[0], binds[1], binds[2], binds[3] );
		}
		let rows := q.all_dict();
		let out := [];
		for ( let row in rows ) {
			out.push(rdf_quad(
				self.term_for_key(row{s}),
				self.term_for_key(row{p}),
				self.term_for_key(row{o}),
				self.term_for_key(row{g}),
			));
		}
		return out;
	}

	method find ( subject := null, predicate := null, object := null,
	graph := null ) {
		return self.explicit_find( subject, predicate, object, graph );
	}

	method serialize ( serializer := null ) {
		let actual_serializer := serializer == null
			? new NQuadsSerializer()
			: serializer;
		return actual_serializer.serialize(self.find());
	}

	method serialize_to ( Path path, serializer := null ) {
		path.spew_utf8(self.serialize(serializer));
		return self;
	}
}