View source with raw comments or as raw
    1/*  Part of SWI-Prolog
    2
    3    Author:        Jan Wielemaker
    4    E-mail:        J.Wielemaker@vu.nl
    5    WWW:           http://www.swi-prolog.org
    6    Copyright (c)  2006-2015, University of Amsterdam
    7                              VU University Amsterdam
    8    All rights reserved.
    9
   10    Redistribution and use in source and binary forms, with or without
   11    modification, are permitted provided that the following conditions
   12    are met:
   13
   14    1. Redistributions of source code must retain the above copyright
   15       notice, this list of conditions and the following disclaimer.
   16
   17    2. Redistributions in binary form must reproduce the above copyright
   18       notice, this list of conditions and the following disclaimer in
   19       the documentation and/or other materials provided with the
   20       distribution.
   21
   22    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
   23    "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
   24    LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
   25    FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
   26    COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
   27    INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
   28    BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
   29    LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
   30    CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
   31    LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
   32    ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
   33    POSSIBILITY OF SUCH DAMAGE.
   34*/
   35
   36:- module(rdf_persistency,
   37          [ rdf_attach_db/2,            % +Directory, +Options
   38            rdf_detach_db/0,            % +Detach current Graph
   39            rdf_current_db/1,           % -Directory
   40            rdf_persistency/2,          % +Graph, +Bool
   41            rdf_flush_journals/1,       % +Options
   42            rdf_persistency_property/1, % ?Property
   43            rdf_journal_file/2,         % ?Graph, ?JournalFile
   44            rdf_snapshot_file/2,        % ?Graph, ?SnapshotFile
   45            rdf_db_to_file/2            % ?Graph, ?FileBase
   46          ]).   47:- use_module(library(semweb/rdf_db)).   48:- use_module(library(filesex)).   49:- use_module(library(lists)).   50:- use_module(library(uri)).   51:- use_module(library(debug)).   52:- use_module(library(option)).   53:- use_module(library(error)).   54:- use_module(library(thread)).   55:- use_module(library(apply)).

RDF persistency plugin

This module provides persistency for rdf_db.pl based on the rdf_monitor/2 predicate to track changes to the repository. Where previous versions used autosafe of the whole database using the quick-load format of rdf_db, this version is based on a quick-load file per source (4th argument of rdf/4), and journalling for edit operations.

The result is safe, avoids frequent small changes to large files which makes synchronisation and backup expensive and avoids long disruption of the server doing the autosafe. Only loading large files disrupts service for some time.

The persistent backup of the database is realised in a directory, using a lock file to avoid corruption due to concurrent access. Each source is represented by two files, the latest snapshot and a journal. The state is restored by loading the snapshot and replaying the journal. The predicate rdf_flush_journals/1 can be used to create fresh snapshots and delete the journals.

See also
- rdf_edit.pl */
To be done
- If there is a complete `.new' snapshot and no journal, we should move the .new to the plain snapshot name as a means of recovery.
- Backup of each graph using one or two files is very costly if there are many graphs. Although the currently used subdirectories avoid hitting OS limits early, this is still not ideal. Probably we should collect (small, older?) files and combine them into a single quick load file. We could call this (similar to GIT) a `pack'.
   90:- volatile
   91    rdf_directory/1,
   92    rdf_lock/2,
   93    rdf_option/1,
   94    source_journal_fd/2,
   95    file_base_db/2.   96:- dynamic
   97    rdf_directory/1,                % Absolute path
   98    rdf_lock/2,                     % Dir, Lock
   99    rdf_option/1,                   % Defined options
  100    source_journal_fd/2,            % DB, JournalFD
  101    file_base_db/2.                 % FileBase, DB
  102
  103:- meta_predicate
  104    no_agc(0).  105
  106:- predicate_options(rdf_attach_db/2, 2,
  107                     [ access(oneof([read_write,read_only])),
  108                       concurrency(positive_integer),
  109                       max_open_journals(positive_integer),
  110                       silent(oneof([true,false,brief])),
  111                       log_nested_transactions(boolean)
  112                     ]).
 rdf_attach_db(+Directory, +Options) is det
Start persistent operations using Directory as place to store files. There are several cases:

Options:

access(+AccessMode)
One of auto (default), read_write or read_only. Read-only access implies that the RDF store is not locked. It is read at startup and all modifications to the data are temporary. The default auto mode is read_write if the directory is writeable and the lock can be acquired. Otherwise it reverts to read_only.
concurrency(+Jobs)
Number of threads to use for loading the initial database. If not provided it is the number of CPUs as optained from the flag cpu_count.
max_open_journals(+Count)
Maximum number of journals kept open. If not provided, the default is 10. See limit_fd_pool/0.
directory_levels(+Count)
Number of levels of intermediate directories for storing the graph files. Default is 2.
silent(+BoolOrBrief)
If true (default false), do not print informational messages. Finally, if brief it will show minimal feedback.
log_nested_transactions(+Boolean)
If true, nested log transactions are added to the journal information. By default (false), no log-term is added for nested transactions.\\
Errors
- existence_error(source_sink, Directory)
- permission_error(write, directory, Directory)
  162rdf_attach_db(DirSpec, Options) :-
  163    option(access(read_only), Options),
  164    !,
  165    absolute_file_name(DirSpec,
  166                       Directory,
  167                       [ access(read),
  168                         file_type(directory)
  169                       ]),
  170    rdf_attach_db_ro(Directory, Options).
  171rdf_attach_db(DirSpec, Options) :-
  172    option(access(read_write), Options),
  173    !,
  174    rdf_attach_db_rw(DirSpec, Options).
  175rdf_attach_db(DirSpec, Options) :-
  176    absolute_file_name(DirSpec,
  177                       Directory,
  178                       [ access(exist),
  179                         file_type(directory),
  180                         file_errors(fail)
  181                       ]),
  182    !,
  183    (   access_file(Directory, write)
  184    ->  catch(rdf_attach_db_rw(Directory, Options), E, true),
  185        (   var(E)
  186        ->  true
  187        ;   E = error(permission_error(lock, rdf_db, _), _)
  188        ->  print_message(warning, E),
  189            print_message(warning, rdf(read_only)),
  190            rdf_attach_db(DirSpec, [access(read_only)|Options])
  191        ;   throw(E)
  192        )
  193    ;   print_message(warning,
  194                      error(permission_error(write, directory, Directory))),
  195        print_message(warning, rdf(read_only)),
  196        rdf_attach_db_ro(Directory, Options)
  197    ).
  198rdf_attach_db(DirSpec, Options) :-
  199    catch(rdf_attach_db_rw(DirSpec, Options), E, true),
  200    (   var(E)
  201    ->  true
  202    ;   print_message(warning, E),
  203        print_message(warning, rdf(read_only)),
  204        rdf_attach_db(DirSpec, [access(read_only)|Options])
  205    ).
  206
  207
  208rdf_attach_db_rw(DirSpec, Options) :-
  209    absolute_file_name(DirSpec,
  210                       Directory,
  211                       [ access(write),
  212                         file_type(directory),
  213                         file_errors(fail)
  214                       ]),
  215    !,
  216    (   rdf_directory(Directory)
  217    ->  true                        % update settings?
  218    ;   rdf_detach_db,
  219        mkdir(Directory),
  220        lock_db(Directory),
  221        assert(rdf_directory(Directory)),
  222        assert_options(Options),
  223        stop_monitor,               % make sure not to register load
  224        no_agc(load_db),
  225        at_halt(rdf_detach_db),
  226        start_monitor
  227    ).
  228rdf_attach_db_rw(DirSpec, Options) :-
  229    absolute_file_name(DirSpec,
  230                       Directory,
  231                       [ solutions(all)
  232                       ]),
  233    (   exists_directory(Directory)
  234    ->  access_file(Directory, write)
  235    ;   catch(make_directory(Directory), _, fail)
  236    ),
  237    !,
  238    rdf_attach_db(Directory, Options).
  239rdf_attach_db_rw(DirSpec, _) :-         % Generate an existence or
  240    absolute_file_name(DirSpec,     % permission error
  241                       Directory,
  242                       [ access(exist),
  243                         file_type(directory)
  244                       ]),
  245    permission_error(write, directory, Directory).
 rdf_attach_db_ro(+Directory, +Options)
Open an RDF database in read-only mode.
  251rdf_attach_db_ro(Directory, Options) :-
  252    rdf_detach_db,
  253    assert(rdf_directory(Directory)),
  254    assert_options(Options),
  255    stop_monitor,           % make sure not to register load
  256    no_agc(load_db).
  257
  258
  259assert_options([]).
  260assert_options([H|T]) :-
  261    (   option_type(H, Check)
  262    ->  Check,
  263        assert(rdf_option(H))
  264    ;   true                        % ignore options we do not understand
  265    ),
  266    assert_options(T).
  267
  268option_type(concurrency(X),             must_be(positive_integer, X)).
  269option_type(max_open_journals(X),       must_be(positive_integer, X)).
  270option_type(directory_levels(X),        must_be(positive_integer, X)).
  271option_type(silent(X),                  must_be(oneof([true,false,brief]), X)).
  272option_type(log_nested_transactions(X), must_be(boolean, X)).
  273option_type(access(X),                  must_be(oneof([read_write,
  274                                                       read_only]), X)).
 rdf_persistency_property(?Property) is nondet
True when Property is a property of the current persistent database. Exposes the properties that can be passed as options to rdf_attach_db/2. Specifically, rdf_persistency_property(access(read_only)) is true iff the database is mounted in read-only mode. In addition, the following property is supported:
directory(Dir)
The directory in which the database resides.
  289rdf_persistency_property(Property) :-
  290    var(Property),
  291    !,
  292    rdf_persistency_property_(Property).
  293rdf_persistency_property(Property) :-
  294    rdf_persistency_property_(Property),
  295    !.
  296
  297rdf_persistency_property_(Property) :-
  298    rdf_option(Property).
  299rdf_persistency_property_(directory(Dir)) :-
  300    rdf_directory(Dir).
 no_agc(:Goal)
Run Goal with atom garbage collection disabled. Loading an RDF database creates large amounts of atoms we know are not garbage.
  308no_agc(Goal) :-
  309    current_prolog_flag(agc_margin, Old),
  310    setup_call_cleanup(
  311        set_prolog_flag(agc_margin, 0),
  312        Goal,
  313        set_prolog_flag(agc_margin, Old)).
 rdf_detach_db is det
Detach from the current database. Succeeds silently if no database is attached. Normally called at the end of the program through at_halt/1.
  322rdf_detach_db :-
  323    debug(halt, 'Detaching RDF database', []),
  324    stop_monitor,
  325    close_journals,
  326    (   retract(rdf_directory(Dir))
  327    ->  debug(halt, 'DB Directory: ~w', [Dir]),
  328        save_prefixes(Dir),
  329        retractall(rdf_option(_)),
  330        retractall(source_journal_fd(_,_)),
  331        unlock_db(Dir)
  332    ;   true
  333    ).
 rdf_current_db(?Dir)
True if Dir is the current RDF persistent database.
  340rdf_current_db(Directory) :-
  341    rdf_directory(Dir),
  342    !,
  343    Dir = Directory.
 rdf_flush_journals(+Options)
Flush dirty journals. Options:
min_size(+KB)
Only flush if journal is over KB in size.
graph(+Graph)
Only flush the journal of Graph
To be done
- Provide a default for min_size?
  357rdf_flush_journals(Options) :-
  358    option(graph(Graph), Options, _),
  359    forall(rdf_graph(Graph),
  360           rdf_flush_journal(Graph, Options)).
  361
  362rdf_flush_journal(Graph, Options) :-
  363    db_files(Graph, _SnapshotFile, JournalFile),
  364    db_file(JournalFile, File),
  365    (   \+ exists_file(File)
  366    ->  true
  367    ;   memberchk(min_size(KB), Options),
  368        size_file(JournalFile, Size),
  369        Size / 1024 < KB
  370    ->  true
  371    ;   create_db(Graph)
  372    ).
  373
  374                 /*******************************
  375                 *             LOAD             *
  376                 *******************************/
 load_db is det
Reload database from the directory specified by rdf_directory/1. First we find all names graphs using find_dbs/1 and then we load them.
  384load_db :-
  385    rdf_directory(Dir),
  386    concurrency(Jobs),
  387    cpu_stat_key(Jobs, StatKey),
  388    get_time(Wall0),
  389    statistics(StatKey, T0),
  390    load_prefixes(Dir),
  391    verbosity(Silent),
  392    find_dbs(Dir, Graphs, SnapShots, Journals),
  393    length(Graphs, GraphCount),
  394    maplist(rdf_unload_graph, Graphs),
  395    rdf_statistics(triples(Triples0)),
  396    load_sources(snapshots, SnapShots, Silent, Jobs),
  397    load_sources(journals, Journals, Silent, Jobs),
  398    rdf_statistics(triples(Triples1)),
  399    statistics(StatKey, T1),
  400    get_time(Wall1),
  401    T is T1 - T0,
  402    Wall is Wall1 - Wall0,
  403    Triples = Triples1 - Triples0,
  404    message_level(Silent, Level),
  405    print_message(Level, rdf(restore(attached(GraphCount, Triples, T/Wall)))).
  406
  407load_sources(_, [], _, _) :- !.
  408load_sources(Type, Sources, Silent, Jobs) :-
  409    length(Sources, Count),
  410    RunJobs is min(Count, Jobs),
  411    print_message(informational, rdf(restoring(Type, Count, RunJobs))),
  412    make_goals(Sources, Silent, 1, Count, Goals),
  413    concurrent(RunJobs, Goals, []).
 make_goals(+DBs, +Silent, +Index, +Total, -Goals)
  418make_goals([], _, _, _, []).
  419make_goals([DB|T0], Silent, I,  Total,
  420           [load_source(DB, Silent, I, Total)|T]) :-
  421    I2 is I + 1,
  422    make_goals(T0, Silent, I2, Total, T).
  423
  424verbosity(Silent) :-
  425    rdf_option(silent(Silent)),
  426    !.
  427verbosity(Silent) :-
  428    current_prolog_flag(verbose, silent),
  429    !,
  430    Silent = true.
  431verbosity(brief).
 concurrency(-Jobs)
Number of jobs to run concurrently.
  438concurrency(Jobs) :-
  439    rdf_option(concurrency(Jobs)),
  440    !.
  441concurrency(Jobs) :-
  442    current_prolog_flag(cpu_count, Jobs),
  443    Jobs > 0,
  444    !.
  445concurrency(1).
  446
  447cpu_stat_key(1, cputime) :- !.
  448cpu_stat_key(_, process_cputime).
 find_dbs(+Dir, -Graphs, -SnapBySize, -JournalBySize) is det
Scan the persistent database and return a list of snapshots and journals, both sorted by file-size. Each term is of the form
db(Size, Ext, DB, DBFile, Depth)
  460find_dbs(Dir, Graphs, SnapBySize, JournalBySize) :-
  461    directory_files(Dir, Files),
  462    phrase(scan_db_files(Files, Dir, '.', 0), Scanned),
  463    maplist(db_graph, Scanned, UnsortedGraphs),
  464    sort(UnsortedGraphs, Graphs),
  465    (   consider_reindex_db(Dir, Graphs, Scanned)
  466    ->  find_dbs(Dir, Graphs, SnapBySize, JournalBySize)
  467    ;   partition(db_is_snapshot, Scanned, Snapshots, Journals),
  468        sort(Snapshots, SnapBySize),
  469        sort(Journals, JournalBySize)
  470    ).
  471
  472consider_reindex_db(Dir, Graphs, Scanned) :-
  473    length(Graphs, Count),
  474    Count > 0,
  475    DepthNeeded is floor(log(Count)/log(256)),
  476    (   maplist(depth_db(DepthNow), Scanned)
  477    ->  (   DepthNeeded > DepthNow
  478        ->  true
  479        ;   retractall(rdf_option(directory_levels(_))),
  480            assertz(rdf_option(directory_levels(DepthNow))),
  481            fail
  482        )
  483    ;   true
  484    ),
  485    reindex_db(Dir, DepthNeeded).
  486
  487db_is_snapshot(Term) :-
  488    arg(2, Term, trp).
  489
  490db_graph(Term, DB) :-
  491    arg(3, Term, DB).
  492
  493db_file_name(Term, File) :-
  494    arg(4, Term, File).
  495
  496depth_db(Depth, DB) :-
  497    arg(5, DB, Depth).
 scan_db_files(+Files, +Dir, +Prefix, +Depth)// is det
Produces a list of db(DB, Size, File) for all recognised RDF database files. File is relative to the database directory Dir.
  504scan_db_files([], _, _, _) -->
  505    [].
  506scan_db_files([Nofollow|T], Dir, Prefix, Depth) -->
  507    { nofollow(Nofollow) },
  508    !,
  509    scan_db_files(T, Dir, Prefix, Depth).
  510scan_db_files([File|T], Dir, Prefix, Depth) -->
  511    { file_name_extension(Base, Ext, File),
  512      db_extension(Ext),
  513      !,
  514      rdf_db_to_file(DB, Base),
  515      directory_file_path(Prefix, File, DBFile),
  516      directory_file_path(Dir, DBFile, AbsFile),
  517      size_file(AbsFile, Size)
  518    },
  519    [ db(Size, Ext, DB, AbsFile, Depth) ],
  520    scan_db_files(T, Dir, Prefix, Depth).
  521scan_db_files([D|T], Dir, Prefix, Depth) -->
  522    { directory_file_path(Prefix, D, SubD),
  523      directory_file_path(Dir, SubD, AbsD),
  524      exists_directory(AbsD),
  525      \+ read_link(AbsD, _, _),    % Do not follow links
  526      !,
  527      directory_files(AbsD, SubFiles),
  528      SubDepth is Depth + 1
  529    },
  530    scan_db_files(SubFiles, Dir, SubD, SubDepth),
  531    scan_db_files(T, Dir, Prefix, Depth).
  532scan_db_files([_|T], Dir, Prefix, Depth) -->
  533    scan_db_files(T, Dir, Prefix, Depth).
  534
  535nofollow(.).
  536nofollow(..).
  537
  538db_extension(trp).
  539db_extension(jrn).
  540
  541:- public load_source/4.                % called through make_goals/5
  542
  543load_source(DB, Silent, Nth, Total) :-
  544    db_file_name(DB, File),
  545    db_graph(DB, Graph),
  546    message_level(Silent, Level),
  547    graph_triple_count(Graph, Count0),
  548    statistics(cputime, T0),
  549    (   db_is_snapshot(DB)
  550    ->  print_message(Level, rdf(restore(Silent, snapshot(Graph, File)))),
  551        rdf_load_db(File)
  552    ;   print_message(Level, rdf(restore(Silent, journal(Graph, File)))),
  553        load_journal(File, Graph)
  554    ),
  555    statistics(cputime, T1),
  556    T is T1 - T0,
  557    graph_triple_count(Graph, Count1),
  558    Count is Count1 - Count0,
  559    print_message(Level, rdf(restore(Silent,
  560                                     done(Graph, T, Count, Nth, Total)))).
  561
  562
  563graph_triple_count(Graph, Count) :-
  564    rdf_statistics(triples_by_graph(Graph, Count)),
  565    !.
  566graph_triple_count(_, 0).
 attach_graph(+Graph, +Options) is det
Load triples and reload journal from the indicated snapshot file.
  574attach_graph(Graph, Options) :-
  575    (   option(silent(true), Options)
  576    ->  Level = silent
  577    ;   Level = informational
  578    ),
  579    db_files(Graph, SnapshotFile, JournalFile),
  580    rdf_retractall(_,_,_,Graph),
  581    statistics(cputime, T0),
  582    print_message(Level, rdf(restore(Silent, Graph))),
  583    db_file(SnapshotFile, AbsSnapShot),
  584    (   exists_file(AbsSnapShot)
  585    ->  print_message(Level, rdf(restore(Silent, snapshot(SnapshotFile)))),
  586        rdf_load_db(AbsSnapShot)
  587    ;   true
  588    ),
  589    (   exists_db(JournalFile)
  590    ->  print_message(Level, rdf(restore(Silent, journal(JournalFile)))),
  591        load_journal(JournalFile, Graph)
  592    ;   true
  593    ),
  594    statistics(cputime, T1),
  595    T is T1 - T0,
  596    (   rdf_statistics(triples_by_graph(Graph, Count))
  597    ->  true
  598    ;   Count = 0
  599    ),
  600    print_message(Level, rdf(restore(Silent,
  601                                     done(Graph, T, Count)))).
  602
  603message_level(true, silent) :- !.
  604message_level(_, informational).
  605
  606
  607                 /*******************************
  608                 *         LOAD JOURNAL         *
  609                 *******************************/
 load_journal(+File:atom, +DB:atom) is det
Process transactions from the RDF journal File, adding the given named graph.
  616load_journal(File, DB) :-
  617    rdf_create_graph(DB),
  618    setup_call_cleanup(
  619        open(File, read, In, [encoding(utf8)]),
  620        ( read(In, T0),
  621          process_journal(T0, In, DB)
  622        ),
  623        close(In)).
  624
  625process_journal(end_of_file, _, _) :- !.
  626process_journal(Term, In, DB) :-
  627    (   process_journal_term(Term, DB)
  628    ->  true
  629    ;   throw(error(type_error(journal_term, Term), _))
  630    ),
  631    read(In, T2),
  632    process_journal(T2, In, DB).
  633
  634process_journal_term(assert(S,P,O), DB) :-
  635    rdf_assert(S,P,O,DB).
  636process_journal_term(assert(S,P,O,Line), DB) :-
  637    rdf_assert(S,P,O,DB:Line).
  638process_journal_term(retract(S,P,O), DB) :-
  639    rdf_retractall(S,P,O,DB).
  640process_journal_term(retract(S,P,O,Line), DB) :-
  641    rdf_retractall(S,P,O,DB:Line).
  642process_journal_term(update(S,P,O,Action), DB) :-
  643    (   rdf_update(S,P,O,DB, Action)
  644    ->  true
  645    ;   print_message(warning, rdf(update_failed(S,P,O,Action)))
  646    ).
  647process_journal_term(start(_), _).      % journal open/close
  648process_journal_term(end(_), _).
  649process_journal_term(begin(_), _).      % logged transaction (compatibility)
  650process_journal_term(end, _).
  651process_journal_term(begin(_,_,_,_), _). % logged transaction (current)
  652process_journal_term(end(_,_,_), _).
  653
  654
  655                 /*******************************
  656                 *         CREATE JOURNAL       *
  657                 *******************************/
  658
  659:- dynamic
  660    blocked_db/2,                   % DB, Reason
  661    transaction_message/3,          % Nesting, Time, Message
  662    transaction_db/3.               % Nesting, DB, Id
 rdf_persistency(+DB, Bool)
Specify whether a database is persistent. Switching to false kills the persistent state. Switching to true creates it.
  669rdf_persistency(DB, Bool) :-
  670    must_be(atom, DB),
  671    must_be(boolean, Bool),
  672    fail.
  673rdf_persistency(DB, false) :-
  674    !,
  675    (   blocked_db(DB, persistency)
  676    ->  true
  677    ;   assert(blocked_db(DB, persistency)),
  678        delete_db(DB)
  679    ).
  680rdf_persistency(DB, true) :-
  681    (   retract(blocked_db(DB, persistency))
  682    ->  create_db(DB)
  683    ;   true
  684    ).
 rdf_db:property_of_graph(?Property, +Graph) is nondet
Extend rdf_graph_property/2 with new properties.
  690:- multifile
  691    rdf_db:property_of_graph/2.  692
  693rdf_db:property_of_graph(persistent(State), Graph) :-
  694    (   blocked_db(Graph, persistency)
  695    ->  State = false
  696    ;   State = true
  697    ).
 start_monitor is det
 stop_monitor is det
Start/stop monitoring the RDF database for changes and update the journal.
  706start_monitor :-
  707    rdf_monitor(monitor,
  708                [ -assert(load)
  709                ]).
  710stop_monitor :-
  711    rdf_monitor(monitor,
  712                [ -all
  713                ]).
 monitor(+Term) is semidet
Handle an rdf_monitor/2 callback to deal with persistency. Note that the monitor calls that come from rdf_db.pl that deal with database changes are serialized. They do come from different threads though.
  722monitor(Msg) :-
  723    debug(monitor, 'Monitor: ~p~n', [Msg]),
  724    fail.
  725monitor(assert(S,P,O,DB:Line)) :-
  726    !,
  727    \+ blocked_db(DB, _),
  728    journal_fd(DB, Fd),
  729    open_transaction(DB, Fd),
  730    format(Fd, '~q.~n', [assert(S,P,O,Line)]),
  731    sync_journal(DB, Fd).
  732monitor(assert(S,P,O,DB)) :-
  733    \+ blocked_db(DB, _),
  734    journal_fd(DB, Fd),
  735    open_transaction(DB, Fd),
  736    format(Fd, '~q.~n', [assert(S,P,O)]),
  737    sync_journal(DB, Fd).
  738monitor(retract(S,P,O,DB:Line)) :-
  739    !,
  740    \+ blocked_db(DB, _),
  741    journal_fd(DB, Fd),
  742    open_transaction(DB, Fd),
  743    format(Fd, '~q.~n', [retract(S,P,O,Line)]),
  744    sync_journal(DB, Fd).
  745monitor(retract(S,P,O,DB)) :-
  746    \+ blocked_db(DB, _),
  747    journal_fd(DB, Fd),
  748    open_transaction(DB, Fd),
  749    format(Fd, '~q.~n', [retract(S,P,O)]),
  750    sync_journal(DB, Fd).
  751monitor(update(S,P,O,DB:Line,Action)) :-
  752    !,
  753    \+ blocked_db(DB, _),
  754    (   Action = graph(NewDB)
  755    ->  monitor(assert(S,P,O,NewDB)),
  756        monitor(retract(S,P,O,DB:Line))
  757    ;   journal_fd(DB, Fd),
  758        format(Fd, '~q.~n', [update(S,P,O,Action)]),
  759        sync_journal(DB, Fd)
  760    ).
  761monitor(update(S,P,O,DB,Action)) :-
  762    \+ blocked_db(DB, _),
  763    (   Action = graph(NewDB)
  764    ->  monitor(assert(S,P,O,NewDB)),
  765        monitor(retract(S,P,O,DB))
  766    ;   journal_fd(DB, Fd),
  767        open_transaction(DB, Fd),
  768        format(Fd, '~q.~n', [update(S,P,O,Action)]),
  769        sync_journal(DB, Fd)
  770    ).
  771monitor(load(BE, _DumpFileURI)) :-
  772    (   BE = end(Graphs)
  773    ->  sync_loaded_graphs(Graphs)
  774    ;   true
  775    ).
  776monitor(create_graph(Graph)) :-
  777    \+ blocked_db(Graph, _),
  778    journal_fd(Graph, Fd),
  779    open_transaction(Graph, Fd),
  780    sync_journal(Graph, Fd).
  781monitor(reset) :-
  782    forall(rdf_graph(Graph), delete_db(Graph)).
  783                                        % TBD: Remove empty directories?
  784
  785monitor(transaction(BE, Id)) :-
  786    monitor_transaction(Id, BE).
  787
  788monitor_transaction(load_journal(DB), begin(_)) :-
  789    !,
  790    assert(blocked_db(DB, journal)).
  791monitor_transaction(load_journal(DB), end(_)) :-
  792    !,
  793    retractall(blocked_db(DB, journal)).
  794
  795monitor_transaction(parse(URI), begin(_)) :-
  796    !,
  797    (   blocked_db(URI, persistency)
  798    ->  true
  799    ;   assert(blocked_db(URI, parse))
  800    ).
  801monitor_transaction(parse(URI), end(_)) :-
  802    !,
  803    (   retract(blocked_db(URI, parse))
  804    ->  create_db(URI)
  805    ;   true
  806    ).
  807monitor_transaction(unload(DB), begin(_)) :-
  808    !,
  809    (   blocked_db(DB, persistency)
  810    ->  true
  811    ;   assert(blocked_db(DB, unload))
  812    ).
  813monitor_transaction(unload(DB), end(_)) :-
  814    !,
  815    (   retract(blocked_db(DB, unload))
  816    ->  delete_db(DB)
  817    ;   true
  818    ).
  819monitor_transaction(log(Msg), begin(N)) :-
  820    !,
  821    check_nested(N),
  822    get_time(Time),
  823    asserta(transaction_message(N, Time, Msg)).
  824monitor_transaction(log(_), end(N)) :-
  825    check_nested(N),
  826    retract(transaction_message(N, _, _)),
  827    !,
  828    findall(DB:Id, retract(transaction_db(N, DB, Id)), DBs),
  829    end_transactions(DBs, N).
  830monitor_transaction(log(Msg, DB), begin(N)) :-
  831    !,
  832    check_nested(N),
  833    get_time(Time),
  834    asserta(transaction_message(N, Time, Msg)),
  835    journal_fd(DB, Fd),
  836    open_transaction(DB, Fd).
  837monitor_transaction(log(Msg, _DB), end(N)) :-
  838    monitor_transaction(log(Msg), end(N)).
 check_nested(+Level) is semidet
True if we must log this transaction. This is always the case for toplevel transactions. Nested transactions are only logged if log_nested_transactions(true) is defined.
  847check_nested(0) :- !.
  848check_nested(_) :-
  849    rdf_option(log_nested_transactions(true)).
 open_transaction(+DB, +Fd) is det
Add a begin(Id, Level, Time, Message) term if a transaction involves DB. Id is an incremental integer, where each database has its own counter. Level is the nesting level, Time a floating point timestamp and Message te message provided as argument to the log message.
  860open_transaction(DB, Fd) :-
  861    transaction_message(N, Time, Msg),
  862    !,
  863    (   transaction_db(N, DB, _)
  864    ->  true
  865    ;   next_transaction_id(DB, Id),
  866        assert(transaction_db(N, DB, Id)),
  867        RoundedTime is round(Time*100)/100,
  868        format(Fd, '~q.~n', [begin(Id, N, RoundedTime, Msg)])
  869    ).
  870open_transaction(_,_).
 next_transaction_id(+DB, -Id) is det
Id is the number to user for the next logged transaction on DB. Transactions in each named graph are numbered in sequence. Searching the Id of the last transaction is performed by the 2nd clause starting 1Kb from the end and doubling this offset each failure.
  881:- dynamic
  882    current_transaction_id/2.  883
  884next_transaction_id(DB, Id) :-
  885    retract(current_transaction_id(DB, Last)),
  886    !,
  887    Id is Last + 1,
  888    assert(current_transaction_id(DB, Id)).
  889next_transaction_id(DB, Id) :-
  890    db_files(DB, _, Journal),
  891    exists_file(Journal),
  892    !,
  893    size_file(Journal, Size),
  894    open_db(Journal, read, In, []),
  895    call_cleanup(iterative_expand(In, Size, Last), close(In)),
  896    Id is Last + 1,
  897    assert(current_transaction_id(DB, Id)).
  898next_transaction_id(DB, 1) :-
  899    assert(current_transaction_id(DB, 1)).
  900
  901iterative_expand(_, 0, 0) :- !.
  902iterative_expand(In, Size, Last) :-     % Scan growing sections from the end
  903    Max is floor(log(Size)/log(2)),
  904    between(10, Max, Step),
  905    Offset is -(1<<Step),
  906    seek(In, Offset, eof, _),
  907    skip(In, 10),                   % records are line-based
  908    read(In, T0),
  909    last_transaction_id(T0, In, 0, Last),
  910    Last > 0,
  911    !.
  912iterative_expand(In, _, Last) :-        % Scan the whole file
  913    seek(In, 0, bof, _),
  914    read(In, T0),
  915    last_transaction_id(T0, In, 0, Last).
  916
  917last_transaction_id(end_of_file, _, Last, Last) :- !.
  918last_transaction_id(end(Id, _, _), In, _, Last) :-
  919    read(In, T1),
  920    last_transaction_id(T1, In, Id, Last).
  921last_transaction_id(_, In, Id, Last) :-
  922    read(In, T1),
  923    last_transaction_id(T1, In, Id, Last).
 end_transactions(+DBs:list(atom:id)) is det
End a transaction that affected the given list of databases. We write the list of other affected databases as an argument to the end-term to facilitate fast finding of the related transactions.

In each database, the transaction is ended with a term end(Id, Nesting, Others), where Id and Nesting are the transaction identifier and nesting (see open_transaction/2) and Others is a list of DB:Id, indicating other databases affected by the transaction.

  938end_transactions(DBs, N) :-
  939    end_transactions(DBs, DBs, N).
  940
  941end_transactions([], _, _).
  942end_transactions([DB:Id|T], DBs, N) :-
  943    journal_fd(DB, Fd),
  944    once(select(DB:Id, DBs, Others)),
  945    format(Fd, 'end(~q, ~q, ~q).~n', [Id, N, Others]),
  946    sync_journal(DB, Fd),
  947    end_transactions(T, DBs, N).
 sync_loaded_graphs(+Graphs)
Called after a binary triple has been loaded that added triples to the given graphs.
  955sync_loaded_graphs(Graphs) :-
  956    maplist(create_db, Graphs).
  957
  958
  959                 /*******************************
  960                 *         JOURNAL FILES        *
  961                 *******************************/
 journal_fd(+DB, -Stream) is det
Get an open stream to a journal. If the journal is not open, old journals are closed to satisfy the max_open_journals option. Then the journal is opened in append mode. Journal files are always encoded as UTF-8 for portability as well as to ensure full coverage of Unicode.
  971journal_fd(DB, Fd) :-
  972    source_journal_fd(DB, Fd),
  973    !.
  974journal_fd(DB, Fd) :-
  975    with_mutex(rdf_journal_file,
  976               journal_fd_(DB, Out)),
  977    Fd = Out.
  978
  979journal_fd_(DB, Fd) :-
  980    source_journal_fd(DB, Fd),
  981    !.
  982journal_fd_(DB, Fd) :-
  983    limit_fd_pool,
  984    db_files(DB, _Snapshot, Journal),
  985    open_db(Journal, append, Fd,
  986            [ close_on_abort(false)
  987            ]),
  988    time_stamp(Now),
  989    format(Fd, '~q.~n', [start([time(Now)])]),
  990    assert(source_journal_fd(DB, Fd)).              % new one at the end
 limit_fd_pool is det
Limit the number of open journals to max_open_journals (10). Note that calls from rdf_monitor/2 are issued in different threads, but as they are part of write operations they are fully synchronised.
  999limit_fd_pool :-
 1000    predicate_property(source_journal_fd(_, _), number_of_clauses(N)),
 1001    !,
 1002    (   rdf_option(max_open_journals(Max))
 1003    ->  true
 1004    ;   Max = 10
 1005    ),
 1006    Close is N - Max,
 1007    forall(between(1, Close, _),
 1008           close_oldest_journal).
 1009limit_fd_pool.
 1010
 1011close_oldest_journal :-
 1012    source_journal_fd(DB, _Fd),
 1013    !,
 1014    debug(rdf_persistency, 'Closing old journal for ~q', [DB]),
 1015    close_journal(DB).
 1016close_oldest_journal.
 sync_journal(+DB, +Fd)
Sync journal represented by database and stream. If the DB is involved in a transaction there is no point flushing until the end of the transaction.
 1025sync_journal(DB, _) :-
 1026    transaction_db(_, DB, _),
 1027    !.
 1028sync_journal(_, Fd) :-
 1029    flush_output(Fd).
 close_journal(+DB) is det
Close the journal associated with DB if it is open.
 1035close_journal(DB) :-
 1036    with_mutex(rdf_journal_file,
 1037               close_journal_(DB)).
 1038
 1039close_journal_(DB) :-
 1040    (   retract(source_journal_fd(DB, Fd))
 1041    ->  time_stamp(Now),
 1042        format(Fd, '~q.~n', [end([time(Now)])]),
 1043        close(Fd, [force(true)])
 1044    ;   true
 1045    ).
 close_journals
Close all open journals.
 1051close_journals :-
 1052    forall(source_journal_fd(DB, _),
 1053           catch(close_journal(DB), E,
 1054                 print_message(error, E))).
 create_db(+Graph)
Create a saved version of Graph in corresponding file, close and delete journals.
 1061create_db(Graph) :-
 1062    \+ rdf(_,_,_,Graph),
 1063    !,
 1064    debug(rdf_persistency, 'Deleting empty Graph ~w', [Graph]),
 1065    delete_db(Graph).
 1066create_db(Graph) :-
 1067    debug(rdf_persistency, 'Saving Graph ~w', [Graph]),
 1068    close_journal(Graph),
 1069    db_abs_files(Graph, Snapshot, Journal),
 1070    atom_concat(Snapshot, '.new', NewSnapshot),
 1071    (   catch(( create_directory_levels(Snapshot),
 1072                rdf_save_db(NewSnapshot, Graph)
 1073              ), Error,
 1074              ( print_message(warning, Error),
 1075                fail
 1076              ))
 1077    ->  (   exists_file(Journal)
 1078        ->  delete_file(Journal)
 1079        ;   true
 1080        ),
 1081        rename_file(NewSnapshot, Snapshot),
 1082        debug(rdf_persistency, 'Saved Graph ~w', [Graph])
 1083    ;   catch(delete_file(NewSnapshot), _, true)
 1084    ).
 delete_db(+DB)
Remove snapshot and journal file for DB.
 1091delete_db(DB) :-
 1092    with_mutex(rdf_journal_file,
 1093               delete_db_(DB)).
 1094
 1095delete_db_(DB) :-
 1096    close_journal_(DB),
 1097    db_abs_files(DB, Snapshot, Journal),
 1098    !,
 1099    (   exists_file(Journal)
 1100    ->  delete_file(Journal)
 1101    ;   true
 1102    ),
 1103    (   exists_file(Snapshot)
 1104    ->  delete_file(Snapshot)
 1105    ;   true
 1106    ).
 1107delete_db_(_).
 1108
 1109                 /*******************************
 1110                 *             LOCKING          *
 1111                 *******************************/
 lock_db(+Dir)
Lock the database directory Dir.
 1117lock_db(Dir) :-
 1118    lockfile(Dir, File),
 1119    catch(open(File, update, Out, [lock(write), wait(false)]),
 1120          error(permission_error(Access, _, _), _),
 1121          locked_error(Access, Dir)),
 1122    (   current_prolog_flag(pid, PID)
 1123    ->  true
 1124    ;   PID = 0                     % TBD: Fix in Prolog
 1125    ),
 1126    time_stamp(Now),
 1127    gethostname(Host),
 1128    format(Out, '/* RDF Database is in use */~n~n', []),
 1129    format(Out, '~q.~n', [ locked([ time(Now),
 1130                                    pid(PID),
 1131                                    host(Host)
 1132                                  ])
 1133                         ]),
 1134    flush_output(Out),
 1135    set_end_of_stream(Out),
 1136    assert(rdf_lock(Dir, lock(Out, File))),
 1137    at_halt(unlock_db(Dir)).
 1138
 1139locked_error(lock, Dir) :-
 1140    lockfile(Dir, File),
 1141    (   catch(read_file_to_terms(File, Terms, []), _, fail),
 1142        Terms = [locked(Args)]
 1143    ->  Context = rdf_locked(Args)
 1144    ;   Context = context(_, 'Database is in use')
 1145    ),
 1146    throw(error(permission_error(lock, rdf_db, Dir), Context)).
 1147locked_error(open, Dir) :-
 1148    throw(error(permission_error(lock, rdf_db, Dir),
 1149                context(_, 'Lock file cannot be opened'))).
 unlock_db(+Dir) is det
 unlock_db(+Stream, +File) is det
 1154unlock_db(Dir) :-
 1155    retract(rdf_lock(Dir, lock(Out, File))),
 1156    !,
 1157    unlock_db(Out, File).
 1158unlock_db(_).
 1159
 1160unlock_db(Out, File) :-
 1161    close(Out),
 1162    delete_file(File).
 1163
 1164                 /*******************************
 1165                 *           FILENAMES          *
 1166                 *******************************/
 1167
 1168lockfile(Dir, LockFile) :-
 1169    atomic_list_concat([Dir, /, lock], LockFile).
 1170
 1171directory_levels(Levels) :-
 1172    rdf_option(directory_levels(Levels)),
 1173    !.
 1174directory_levels(2).
 1175
 1176db_file(Base, File) :-
 1177    rdf_directory(Dir),
 1178    directory_levels(Levels),
 1179    db_file(Dir, Base, Levels, File).
 1180
 1181db_file(Dir, Base, Levels, File) :-
 1182    dir_levels(Base, Levels, Segments, [Base]),
 1183    atomic_list_concat([Dir|Segments], /, File).
 1184
 1185open_db(Base, Mode, Stream, Options) :-
 1186    db_file(Base, File),
 1187    create_directory_levels(File),
 1188    open(File, Mode, Stream, [encoding(utf8)|Options]).
 1189
 1190create_directory_levels(_File) :-
 1191    rdf_option(directory_levels(0)),
 1192    !.
 1193create_directory_levels(File) :-
 1194    file_directory_name(File, Dir),
 1195    make_directory_path(Dir).
 1196
 1197exists_db(Base) :-
 1198    db_file(Base, File),
 1199    exists_file(File).
 dir_levels(+File, +Levels, ?Segments, ?Tail) is det
Create a list of intermediate directory names for File. Each directory consists of two hexadecimal digits.
 1206dir_levels(_, 0, Segments, Segments) :- !.
 1207dir_levels(File, Levels, Segments, Tail) :-
 1208    rdf_atom_md5(File, 1, Hash),
 1209    create_dir_levels(Levels, 0, Hash, Segments, Tail).
 1210
 1211create_dir_levels(0, _, _, Segments, Segments) :- !.
 1212create_dir_levels(N, S, Hash, [S1|Segments0], Tail) :-
 1213    sub_atom(Hash, S, 2, _, S1),
 1214    S2 is S+2,
 1215    N2 is N-1,
 1216    create_dir_levels(N2, S2, Hash, Segments0, Tail).
 db_files(+DB, -Snapshot, -Journal)
db_files(-DB, +Snapshot, -Journal)
db_files(-DB, -Snapshot, +Journal)
True if named graph DB is represented by the files Snapshot and Journal. The filenames are local to the directory representing the store.
 1227db_files(DB, Snapshot, Journal) :-
 1228    nonvar(DB),
 1229    !,
 1230    rdf_db_to_file(DB, Base),
 1231    atom_concat(Base, '.trp', Snapshot),
 1232    atom_concat(Base, '.jrn', Journal).
 1233db_files(DB, Snapshot, Journal) :-
 1234    nonvar(Snapshot),
 1235    !,
 1236    atom_concat(Base, '.trp', Snapshot),
 1237    atom_concat(Base, '.jrn', Journal),
 1238    rdf_db_to_file(DB, Base).
 1239db_files(DB, Snapshot, Journal) :-
 1240    nonvar(Journal),
 1241    !,
 1242    atom_concat(Base, '.jrn', Journal),
 1243    atom_concat(Base, '.trp', Snapshot),
 1244    rdf_db_to_file(DB, Base).
 1245
 1246db_abs_files(DB, Snapshot, Journal) :-
 1247    db_files(DB, Snapshot0, Journal0),
 1248    db_file(Snapshot0, Snapshot),
 1249    db_file(Journal0, Journal).
 rdf_journal_file(+Graph, -File) is semidet
rdf_journal_file(-Graph, -File) is nondet
True if File the name of the existing journal file for Graph.
 1257rdf_journal_file(Graph, Journal) :-
 1258    (   var(Graph)
 1259    ->  rdf_graph(Graph)
 1260    ;   true
 1261    ),
 1262    db_abs_files(Graph, _Snapshot, Journal),
 1263    exists_file(Journal).
 rdf_snapshot_file(+Graph, -File) is semidet
rdf_snapshot_file(-Graph, -File) is nondet
True if File the name of the existing snapshot file for Graph.
 1271rdf_snapshot_file(Graph, Snapshot) :-
 1272    (   var(Graph)
 1273    ->  rdf_graph(Graph)    % also pick the empty graphs
 1274    ;   true
 1275    ),
 1276    db_abs_files(Graph, Snapshot, _Journal),
 1277    exists_file(Snapshot).
 rdf_db_to_file(+DB, -File) is det
rdf_db_to_file(-DB, +File) is det
Translate between database encoding (often an file or URL) and the name we store in the directory. We keep a cache for two reasons. Speed, but much more important is that the mapping of raw --> encoded provided by www_form_encode/2 is not guaranteed to be unique by the W3C standards.
 1289rdf_db_to_file(DB, File) :-
 1290    file_base_db(File, DB),
 1291    !.
 1292rdf_db_to_file(DB, File) :-
 1293    url_to_filename(DB, File),
 1294    assert(file_base_db(File, DB)).
 url_to_filename(+URL, -FileName) is det
url_to_filename(-URL, +FileName) is det
Turn a valid URL into a filename. Earlier versions used www_form_encode/2, but this can produce characters that are not valid in filenames. We will use the same encoding as www_form_encode/2, but using our own rules for allowed characters. The only requirement is that we avoid any filename special character in use. The current encoding use US-ASCII alnum characters, _ and %
 1307url_to_filename(URL, FileName) :-
 1308    atomic(URL),
 1309    !,
 1310    atom_codes(URL, Codes),
 1311    phrase(url_encode(EncCodes), Codes),
 1312    atom_codes(FileName, EncCodes).
 1313url_to_filename(URL, FileName) :-
 1314    uri_encoded(path, URL, FileName).
 1315
 1316url_encode([0'+|T]) -->
 1317    " ",
 1318    !,
 1319    url_encode(T).
 1320url_encode([C|T]) -->
 1321    alphanum(C),
 1322    !,
 1323    url_encode(T).
 1324url_encode([C|T]) -->
 1325    no_enc_extra(C),
 1326    !,
 1327    url_encode(T).
 1328url_encode(Enc) -->
 1329    (   "\r\n"
 1330    ;   "\n"
 1331    ),
 1332    !,
 1333    { string_codes("%0D%0A", Codes),
 1334      append(Codes, T, Enc)
 1335    },
 1336    url_encode(T).
 1337url_encode([]) -->
 1338    eos,
 1339    !.
 1340url_encode([0'%,D1,D2|T]) -->
 1341    [C],
 1342    { Dv1 is (C>>4 /\ 0xf),
 1343      Dv2 is (C /\ 0xf),
 1344      code_type(D1, xdigit(Dv1)),
 1345      code_type(D2, xdigit(Dv2))
 1346    },
 1347    url_encode(T).
 1348
 1349eos([], []).
 1350
 1351alphanum(C) -->
 1352    [C],
 1353    { C < 128,                      % US-ASCII
 1354      code_type(C, alnum)
 1355    }.
 1356
 1357no_enc_extra(0'_) --> "_".
 1358
 1359
 1360                 /*******************************
 1361                 *             REINDEX          *
 1362                 *******************************/
 reindex_db(+Dir, +Levels)
Reindex the database by creating intermediate directories.
 1368reindex_db(Dir, Levels) :-
 1369    directory_files(Dir, Files),
 1370    reindex_files(Files, Dir, '.', 0, Levels),
 1371    remove_empty_directories(Files, Dir).
 1372
 1373reindex_files([], _, _, _, _).
 1374reindex_files([Nofollow|Files], Dir, Prefix, CLevel, Levels) :-
 1375    nofollow(Nofollow),
 1376    !,
 1377    reindex_files(Files, Dir, Prefix, CLevel, Levels).
 1378reindex_files([File|Files], Dir, Prefix, CLevel, Levels) :-
 1379    CLevel \== Levels,
 1380    file_name_extension(_Base, Ext, File),
 1381    db_extension(Ext),
 1382    !,
 1383    directory_file_path(Prefix, File, DBFile),
 1384    directory_file_path(Dir, DBFile, OldPath),
 1385    db_file(Dir, File, Levels, NewPath),
 1386    debug(rdf_persistency, 'Rename ~q --> ~q', [OldPath, NewPath]),
 1387    file_directory_name(NewPath, NewDir),
 1388    make_directory_path(NewDir),
 1389    rename_file(OldPath, NewPath),
 1390    reindex_files(Files, Dir, Prefix, CLevel, Levels).
 1391reindex_files([D|Files], Dir, Prefix, CLevel, Levels) :-
 1392    directory_file_path(Prefix, D, SubD),
 1393    directory_file_path(Dir, SubD, AbsD),
 1394    exists_directory(AbsD),
 1395    \+ read_link(AbsD, _, _),      % Do not follow links
 1396    !,
 1397    directory_files(AbsD, SubFiles),
 1398    CLevel2 is CLevel + 1,
 1399    reindex_files(SubFiles, Dir, SubD, CLevel2, Levels),
 1400    reindex_files(Files, Dir, Prefix, CLevel, Levels).
 1401reindex_files([_|Files], Dir, Prefix, CLevel, Levels) :-
 1402    reindex_files(Files, Dir, Prefix, CLevel, Levels).
 1403
 1404
 1405remove_empty_directories([], _).
 1406remove_empty_directories([File|Files], Dir) :-
 1407    \+ nofollow(File),
 1408    directory_file_path(Dir, File, Path),
 1409    exists_directory(Path),
 1410    \+ read_link(Path, _, _),
 1411    !,
 1412    directory_files(Path, Content),
 1413    exclude(nofollow, Content, RealContent),
 1414    (   RealContent == []
 1415    ->  debug(rdf_persistency, 'Remove empty dir ~q', [Path]),
 1416        delete_directory(Path)
 1417    ;   remove_empty_directories(RealContent, Path)
 1418    ),
 1419    remove_empty_directories(Files, Dir).
 1420remove_empty_directories([_|Files], Dir) :-
 1421    remove_empty_directories(Files, Dir).
 1422
 1423
 1424                 /*******************************
 1425                 *            PREFIXES          *
 1426                 *******************************/
 1427
 1428save_prefixes(Dir) :-
 1429    atomic_list_concat([Dir, /, 'prefixes.db'], PrefixFile),
 1430    setup_call_cleanup(open(PrefixFile, write, Out, [encoding(utf8)]),
 1431                       write_prefixes(Out),
 1432                       close(Out)).
 1433
 1434write_prefixes(Out) :-
 1435    format(Out, '% Snapshot of defined RDF prefixes~n~n', []),
 1436    forall(rdf_current_ns(Alias, URI),
 1437           format(Out, 'prefix(~q, ~q).~n', [Alias, URI])).
 load_prefixes(+RDFDBDir) is det
If the file RDFDBDir/prefixes.db exists, load the prefixes. The prefixes are registered using rdf_register_ns/3. Possible errors because the prefix definitions have changed are printed as warnings, retaining the old definition. Note that changing prefixes generally requires reloading all RDF from the source.
 1447load_prefixes(Dir) :-
 1448    atomic_list_concat([Dir, /, 'prefixes.db'], PrefixFile),
 1449    (   exists_file(PrefixFile)
 1450    ->  setup_call_cleanup(open(PrefixFile, read, In, [encoding(utf8)]),
 1451                           read_prefixes(In),
 1452                           close(In))
 1453    ;   true
 1454    ).
 1455
 1456read_prefixes(Stream) :-
 1457    read_term(Stream, T0, []),
 1458    read_prefixes(T0, Stream).
 1459
 1460read_prefixes(end_of_file, _) :- !.
 1461read_prefixes(prefix(Alias, URI), Stream) :-
 1462    !,
 1463    must_be(atom, Alias),
 1464    must_be(atom, URI),
 1465    catch(rdf_register_ns(Alias, URI, []), E,
 1466          print_message(warning, E)),
 1467    read_term(Stream, T, []),
 1468    read_prefixes(T, Stream).
 1469read_prefixes(Term, _) :-
 1470    domain_error(prefix_term, Term).
 1471
 1472
 1473                 /*******************************
 1474                 *              UTIL            *
 1475                 *******************************/
 mkdir(+Directory)
Create a directory if it does not already exist.
 1481mkdir(Directory) :-
 1482    exists_directory(Directory),
 1483    !.
 1484mkdir(Directory) :-
 1485    make_directory(Directory).
 time_stamp(-Integer)
Return time-stamp rounded to integer.
 1491time_stamp(Int) :-
 1492    get_time(Now),
 1493    Int is round(Now).
 1494
 1495
 1496                 /*******************************
 1497                 *            MESSAGES          *
 1498                 *******************************/
 1499
 1500:- multifile
 1501    prolog:message/3,
 1502    prolog:message_context/3. 1503
 1504prolog:message(rdf(Term)) -->
 1505    message(Term).
 1506
 1507message(restoring(Type, Count, Jobs)) -->
 1508    [ 'Restoring ~D ~w using ~D concurrent workers'-[Count, Type, Jobs] ].
 1509message(restore(attached(Graphs, Triples, Time/Wall))) -->
 1510    { catch(Percent is round(100*Time/Wall), _, Percent = 0) },
 1511    [ 'Loaded ~D graphs (~D triples) in ~2f sec. (~d% CPU = ~2f sec.)'-
 1512      [Graphs, Triples, Wall, Percent, Time] ].
 1513% attach_graph/2
 1514message(restore(true, Action)) -->
 1515    !,
 1516    silent_message(Action).
 1517message(restore(brief, Action)) -->
 1518    !,
 1519    brief_message(Action).
 1520message(restore(_, Graph)) -->
 1521    [ 'Restoring ~p ... '-[Graph], flush ].
 1522message(restore(_, snapshot(_))) -->
 1523    [ at_same_line, '(snapshot) '-[], flush ].
 1524message(restore(_, journal(_))) -->
 1525    [ at_same_line, '(journal) '-[], flush ].
 1526message(restore(_, done(_, Time, Count))) -->
 1527    [ at_same_line, '~D triples in ~2f sec.'-[Count, Time] ].
 1528% load_source/4
 1529message(restore(_, snapshot(G, _))) -->
 1530    [ 'Restoring ~p\t(snapshot)'-[G], flush ].
 1531message(restore(_, journal(G, _))) -->
 1532    [ 'Restoring ~p\t(journal)'-[G], flush ].
 1533message(restore(_, done(_, Time, Count))) -->
 1534    [ at_same_line, '~D triples in ~2f sec.'-[Count, Time] ].
 1535% journal handling
 1536message(update_failed(S,P,O,Action)) -->
 1537    [ 'Failed to update <~p ~p ~p> with ~p'-[S,P,O,Action] ].
 1538% directory reindexing
 1539message(reindex(Count, Depth)) -->
 1540    [ 'Restructuring database with ~d levels (~D graphs)'-[Depth, Count] ].
 1541message(reindex(Depth)) -->
 1542    [ 'Fixing database directory structure (~d levels)'-[Depth] ].
 1543message(read_only) -->
 1544    [ 'Cannot write persistent store; continuing in read-only mode.', nl,
 1545      'All changes to the RDF store will be lost if this process terminates.'
 1546    ].
 1547
 1548silent_message(_Action) --> [].
 1549
 1550brief_message(done(Graph, _Time, _Count, Nth, Total)) -->
 1551    { file_base_name(Graph, Base) },
 1552    [ at_same_line,
 1553      '\r~p~`.t ~D of ~D graphs~72|'-[Base, Nth, Total],
 1554      flush
 1555    ].
 1556brief_message(_) --> [].
 1557
 1558
 1559prolog:message_context(rdf_locked(Args)) -->
 1560    { memberchk(time(Time), Args),
 1561      memberchk(pid(Pid), Args),
 1562      format_time(string(S), '%+', Time)
 1563    },
 1564    [ nl,
 1565      'locked at ~s by process id ~w'-[S,Pid]
 1566    ]