1/* Part of SWI-Prolog 2 3 Author: Jan Wielemaker 4 E-mail: J.Wielemaker@vu.nl 5 WWW: http://www.swi-prolog.org 6 Copyright (c) 2006-2015, University of Amsterdam 7 VU University Amsterdam 8 All rights reserved. 9 10 Redistribution and use in source and binary forms, with or without 11 modification, are permitted provided that the following conditions 12 are met: 13 14 1. Redistributions of source code must retain the above copyright 15 notice, this list of conditions and the following disclaimer. 16 17 2. Redistributions in binary form must reproduce the above copyright 18 notice, this list of conditions and the following disclaimer in 19 the documentation and/or other materials provided with the 20 distribution. 21 22 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 23 "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 24 LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 25 FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 26 COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 27 INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 28 BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 29 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 30 CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 31 LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 32 ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 33 POSSIBILITY OF SUCH DAMAGE. 34*/ 35 36:- module(rdf_persistency, 37 [ rdf_attach_db/2, % +Directory, +Options 38 rdf_detach_db/0, % +Detach current Graph 39 rdf_current_db/1, % -Directory 40 rdf_persistency/2, % +Graph, +Bool 41 rdf_flush_journals/1, % +Options 42 rdf_persistency_property/1, % ?Property 43 rdf_journal_file/2, % ?Graph, ?JournalFile 44 rdf_snapshot_file/2, % ?Graph, ?SnapshotFile 45 rdf_db_to_file/2 % ?Graph, ?FileBase 46 ]). 47:- use_module(library(semweb/rdf_db)). 48:- use_module(library(filesex)). 49:- use_module(library(lists)). 50:- use_module(library(uri)). 51:- use_module(library(debug)). 52:- use_module(library(option)). 53:- use_module(library(error)). 54:- use_module(library(thread)). 55:- use_module(library(apply)).
90:- volatile 91 rdf_directory/1, 92 rdf_lock/2, 93 rdf_option/1, 94 source_journal_fd/2, 95 file_base_db/2. 96:- dynamic 97 rdf_directory/1, % Absolute path 98 rdf_lock/2, % Dir, Lock 99 rdf_option/1, % Defined options 100 source_journal_fd/2, % DB, JournalFD 101 file_base_db/2. % FileBase, DB 102 103:- meta_predicate 104 no_agc( ). 105 106:- predicate_options(rdf_attach_db/2, 2, 107 [ access(oneof([read_write,read_only])), 108 concurrency(positive_integer), 109 max_open_journals(positive_integer), 110 silent(oneof([true,false,brief])), 111 log_nested_transactions(boolean) 112 ]).
Options:
auto
(default), read_write
or
read_only
. Read-only access implies that the RDF
store is not locked. It is read at startup and all
modifications to the data are temporary. The default
auto
mode is read_write
if the directory is
writeable and the lock can be acquired. Otherwise
it reverts to read_only
.cpu_count
.true
(default false
), do not print informational
messages. Finally, if brief
it will show minimal
feedback.true
, nested log transactions are added to the
journal information. By default (false
), no log-term
is added for nested transactions.\\162rdf_attach_db(DirSpec, Options) :- 163 option(access(read_only), Options), 164 !, 165 absolute_file_name(DirSpec, 166 Directory, 167 [ access(read), 168 file_type(directory) 169 ]), 170 rdf_attach_db_ro(Directory, Options). 171rdf_attach_db(DirSpec, Options) :- 172 option(access(read_write), Options), 173 !, 174 rdf_attach_db_rw(DirSpec, Options). 175rdf_attach_db(DirSpec, Options) :- 176 absolute_file_name(DirSpec, 177 Directory, 178 [ access(exist), 179 file_type(directory), 180 file_errors(fail) 181 ]), 182 !, 183 ( access_file(Directory, write) 184 -> catch(rdf_attach_db_rw(Directory, Options), E, true), 185 ( var(E) 186 -> true 187 ; E = error(permission_error(lock, rdf_db, _), _) 188 -> print_message(warning, E), 189 print_message(warning, rdf(read_only)), 190 rdf_attach_db(DirSpec, [access(read_only)|Options]) 191 ; throw(E) 192 ) 193 ; print_message(warning, 194 error(permission_error(write, directory, Directory))), 195 print_message(warning, rdf(read_only)), 196 rdf_attach_db_ro(Directory, Options) 197 ). 198rdf_attach_db(DirSpec, Options) :- 199 catch(rdf_attach_db_rw(DirSpec, Options), E, true), 200 ( var(E) 201 -> true 202 ; print_message(warning, E), 203 print_message(warning, rdf(read_only)), 204 rdf_attach_db(DirSpec, [access(read_only)|Options]) 205 ). 206 207 208rdf_attach_db_rw(DirSpec, Options) :- 209 absolute_file_name(DirSpec, 210 Directory, 211 [ access(write), 212 file_type(directory), 213 file_errors(fail) 214 ]), 215 !, 216 ( rdf_directory(Directory) 217 -> true % update settings? 218 ; rdf_detach_db, 219 mkdir(Directory), 220 lock_db(Directory), 221 assert(rdf_directory(Directory)), 222 assert_options(Options), 223 stop_monitor, % make sure not to register load 224 no_agc(load_db), 225 at_halt(rdf_detach_db), 226 start_monitor 227 ). 228rdf_attach_db_rw(DirSpec, Options) :- 229 absolute_file_name(DirSpec, 230 Directory, 231 [ solutions(all) 232 ]), 233 ( exists_directory(Directory) 234 -> access_file(Directory, write) 235 ; catch(make_directory(Directory), _, fail) 236 ), 237 !, 238 rdf_attach_db(Directory, Options). 239rdf_attach_db_rw(DirSpec, _) :- % Generate an existence or 240 absolute_file_name(DirSpec, % permission error 241 Directory, 242 [ access(exist), 243 file_type(directory) 244 ]), 245 permission_error(write, directory, Directory).
251rdf_attach_db_ro(Directory, Options) :- 252 rdf_detach_db, 253 assert(rdf_directory(Directory)), 254 assert_options(Options), 255 stop_monitor, % make sure not to register load 256 no_agc(load_db). 257 258 259assert_options([]). 260assert_options([H|T]) :- 261 ( option_type(H, Check) 262 -> , 263 assert(rdf_option(H)) 264 ; true % ignore options we do not understand 265 ), 266 assert_options(T). 267 268option_type(concurrency(X), must_be(positive_integer, X)). 269option_type(max_open_journals(X), must_be(positive_integer, X)). 270option_type(directory_levels(X), must_be(positive_integer, X)). 271option_type(silent(X), must_be(oneof([true,false,brief]), X)). 272option_type(log_nested_transactions(X), must_be(boolean, X)). 273option_type(access(X), must_be(oneof([read_write, 274 read_only]), X)).
rdf_persistency_property(access(read_only))
is true iff the database
is mounted in read-only mode. In addition, the following property is
supported:
289rdf_persistency_property(Property) :- 290 var(Property), 291 !, 292 rdf_persistency_property_(Property). 293rdf_persistency_property(Property) :- 294 rdf_persistency_property_(Property), 295 !. 296 297rdf_persistency_property_(Property) :- 298 rdf_option(Property). 299rdf_persistency_property_(directory(Dir)) :- 300 rdf_directory(Dir).
308no_agc(Goal) :-
309 current_prolog_flag(agc_margin, Old),
310 setup_call_cleanup(
311 set_prolog_flag(agc_margin, 0),
312 ,
313 set_prolog_flag(agc_margin, Old)).
322rdf_detach_db :-
323 debug(halt, 'Detaching RDF database', []),
324 stop_monitor,
325 close_journals,
326 ( retract(rdf_directory(Dir))
327 -> debug(halt, 'DB Directory: ~w', [Dir]),
328 save_prefixes(Dir),
329 retractall(rdf_option(_)),
330 retractall(source_journal_fd(_,_)),
331 unlock_db(Dir)
332 ; true
333 ).
340rdf_current_db(Directory) :-
341 rdf_directory(Dir),
342 !,
343 Dir = Directory.
357rdf_flush_journals(Options) :- 358 option(graph(Graph), Options, _), 359 forall(rdf_graph(Graph), 360 rdf_flush_journal(Graph, Options)). 361 362rdf_flush_journal(Graph, Options) :- 363 db_files(Graph, _SnapshotFile, JournalFile), 364 db_file(JournalFile, File), 365 ( \+ exists_file(File) 366 -> true 367 ; memberchk(min_size(KB), Options), 368 size_file(JournalFile, Size), 369 Size / 1024 < KB 370 -> true 371 ; create_db(Graph) 372 ). 373 374 /******************************* 375 * LOAD * 376 *******************************/
384load_db :- 385 rdf_directory(Dir), 386 concurrency(Jobs), 387 cpu_stat_key(Jobs, StatKey), 388 get_time(Wall0), 389 statistics(StatKey, T0), 390 load_prefixes(Dir), 391 verbosity(Silent), 392 find_dbs(Dir, Graphs, SnapShots, Journals), 393 length(Graphs, GraphCount), 394 maplist(rdf_unload_graph, Graphs), 395 rdf_statistics(triples(Triples0)), 396 load_sources(snapshots, SnapShots, Silent, Jobs), 397 load_sources(journals, Journals, Silent, Jobs), 398 rdf_statistics(triples(Triples1)), 399 statistics(StatKey, T1), 400 get_time(Wall1), 401 T is T1 - T0, 402 Wall is Wall1 - Wall0, 403 Triples = Triples1 - Triples0, 404 message_level(Silent, Level), 405 print_message(Level, rdf(restore(attached(GraphCount, Triples, T/Wall)))). 406 407load_sources(_, [], _, _) :- !. 408load_sources(Type, Sources, Silent, Jobs) :- 409 length(Sources, Count), 410 RunJobs is min(Count, Jobs), 411 print_message(informational, rdf(restoring(Type, Count, RunJobs))), 412 make_goals(Sources, Silent, 1, Count, Goals), 413 concurrent(RunJobs, Goals, []).
418make_goals([], _, _, _, []). 419make_goals([DB|T0], Silent, I, Total, 420 [load_source(DB, Silent, I, Total)|T]) :- 421 I2 is I + 1, 422 make_goals(T0, Silent, I2, Total, T). 423 424verbosity(Silent) :- 425 rdf_option(silent(Silent)), 426 !. 427verbosity(Silent) :- 428 current_prolog_flag(verbose, silent), 429 !, 430 Silent = true. 431verbosity(brief).
438concurrency(Jobs) :- 439 rdf_option(concurrency(Jobs)), 440 !. 441concurrency(Jobs) :- 442 current_prolog_flag(cpu_count, Jobs), 443 Jobs > 0, 444 !. 445concurrency(1). 446 447cpu_stat_key(1, cputime) :- !. 448cpu_stat_key(_, process_cputime).
db(Size, Ext, DB, DBFile, Depth)
460find_dbs(Dir, Graphs, SnapBySize, JournalBySize) :- 461 directory_files(Dir, Files), 462 phrase(scan_db_files(Files, Dir, '.', 0), Scanned), 463 maplist(db_graph, Scanned, UnsortedGraphs), 464 sort(UnsortedGraphs, Graphs), 465 ( consider_reindex_db(Dir, Graphs, Scanned) 466 -> find_dbs(Dir, Graphs, SnapBySize, JournalBySize) 467 ; partition(db_is_snapshot, Scanned, Snapshots, Journals), 468 sort(Snapshots, SnapBySize), 469 sort(Journals, JournalBySize) 470 ). 471 472consider_reindex_db(Dir, Graphs, Scanned) :- 473 length(Graphs, Count), 474 Count > 0, 475 DepthNeeded is floor(log(Count)/log(256)), 476 ( maplist(depth_db(DepthNow), Scanned) 477 -> ( DepthNeeded > DepthNow 478 -> true 479 ; retractall(rdf_option(directory_levels(_))), 480 assertz(rdf_option(directory_levels(DepthNow))), 481 fail 482 ) 483 ; true 484 ), 485 reindex_db(Dir, DepthNeeded). 486 487db_is_snapshot(Term) :- 488 arg(2, Term, trp). 489 490db_graph(Term, DB) :- 491 arg(3, Term, DB). 492 493db_file_name(Term, File) :- 494 arg(4, Term, File). 495 496depth_db(Depth, DB) :- 497 arg(5, DB, Depth).
db(DB, Size, File)
for all recognised RDF
database files. File is relative to the database directory Dir.504scan_db_files([], _, _, _) --> 505 []. 506scan_db_files([Nofollow|T], Dir, Prefix, Depth) --> 507 { nofollow(Nofollow) }, 508 !, 509 scan_db_files(T, Dir, Prefix, Depth). 510scan_db_files([File|T], Dir, Prefix, Depth) --> 511 { file_name_extension(Base, Ext, File), 512 db_extension(Ext), 513 !, 514 rdf_db_to_file(DB, Base), 515 directory_file_path(Prefix, File, DBFile), 516 directory_file_path(Dir, DBFile, AbsFile), 517 size_file(AbsFile, Size) 518 }, 519 [ db(Size, Ext, DB, AbsFile, Depth) ], 520 scan_db_files(T, Dir, Prefix, Depth). 521scan_db_files([D|T], Dir, Prefix, Depth) --> 522 { directory_file_path(Prefix, D, SubD), 523 directory_file_path(Dir, SubD, AbsD), 524 exists_directory(AbsD), 525 \+ read_link(AbsD, _, _), % Do not follow links 526 !, 527 directory_files(AbsD, SubFiles), 528 SubDepth is Depth + 1 529 }, 530 scan_db_files(SubFiles, Dir, SubD, SubDepth), 531 scan_db_files(T, Dir, Prefix, Depth). 532scan_db_files([_|T], Dir, Prefix, Depth) --> 533 scan_db_files(T, Dir, Prefix, Depth). 534 535nofollow(.). 536nofollow(..). 537 538db_extension(trp). 539db_extension(jrn). 540 541:- public load_source/4. % called through make_goals/5 542 543load_source(DB, Silent, Nth, Total) :- 544 db_file_name(DB, File), 545 db_graph(DB, Graph), 546 message_level(Silent, Level), 547 graph_triple_count(Graph, Count0), 548 statistics(cputime, T0), 549 ( db_is_snapshot(DB) 550 -> print_message(Level, rdf(restore(Silent, snapshot(Graph, File)))), 551 rdf_load_db(File) 552 ; print_message(Level, rdf(restore(Silent, journal(Graph, File)))), 553 load_journal(File, Graph) 554 ), 555 statistics(cputime, T1), 556 T is T1 - T0, 557 graph_triple_count(Graph, Count1), 558 Count is Count1 - Count0, 559 print_message(Level, rdf(restore(Silent, 560 done(Graph, T, Count, Nth, Total)))). 561 562 563graph_triple_count(Graph, Count) :- 564 rdf_statistics(triples_by_graph(Graph, Count)), 565 !. 566graph_triple_count(_, 0).
574attach_graph(Graph, Options) :- 575 ( option(silent(true), Options) 576 -> Level = silent 577 ; Level = informational 578 ), 579 db_files(Graph, SnapshotFile, JournalFile), 580 rdf_retractall(_,_,_,Graph), 581 statistics(cputime, T0), 582 print_message(Level, rdf(restore(Silent, Graph))), 583 db_file(SnapshotFile, AbsSnapShot), 584 ( exists_file(AbsSnapShot) 585 -> print_message(Level, rdf(restore(Silent, snapshot(SnapshotFile)))), 586 rdf_load_db(AbsSnapShot) 587 ; true 588 ), 589 ( exists_db(JournalFile) 590 -> print_message(Level, rdf(restore(Silent, journal(JournalFile)))), 591 load_journal(JournalFile, Graph) 592 ; true 593 ), 594 statistics(cputime, T1), 595 T is T1 - T0, 596 ( rdf_statistics(triples_by_graph(Graph, Count)) 597 -> true 598 ; Count = 0 599 ), 600 print_message(Level, rdf(restore(Silent, 601 done(Graph, T, Count)))). 602 603message_level(true, silent) :- !. 604message_level(_, informational). 605 606 607 /******************************* 608 * LOAD JOURNAL * 609 *******************************/
616load_journal(File, DB) :- 617 rdf_create_graph(DB), 618 setup_call_cleanup( 619 open(File, read, In, [encoding(utf8)]), 620 ( read(In, T0), 621 process_journal(T0, In, DB) 622 ), 623 close(In)). 624 625process_journal(end_of_file, _, _) :- !. 626process_journal(Term, In, DB) :- 627 ( process_journal_term(Term, DB) 628 -> true 629 ; throw(error(type_error(journal_term, Term), _)) 630 ), 631 read(In, T2), 632 process_journal(T2, In, DB). 633 634process_journal_term(assert(S,P,O), DB) :- 635 rdf_assert(S,P,O,DB). 636process_journal_term(assert(S,P,O,Line), DB) :- 637 rdf_assert(S,P,O,DB:Line). 638process_journal_term(retract(S,P,O), DB) :- 639 rdf_retractall(S,P,O,DB). 640process_journal_term(retract(S,P,O,Line), DB) :- 641 rdf_retractall(S,P,O,DB:Line). 642process_journal_term(update(S,P,O,Action), DB) :- 643 ( rdf_update(S,P,O,DB, Action) 644 -> true 645 ; print_message(warning, rdf(update_failed(S,P,O,Action))) 646 ). 647process_journal_term(start(_), _). % journal open/close 648process_journal_term(end(_), _). 649process_journal_term(begin(_), _). % logged transaction (compatibility) 650process_journal_term(end, _). 651process_journal_term(begin(_,_,_,_), _). % logged transaction (current) 652process_journal_term(end(_,_,_), _). 653 654 655 /******************************* 656 * CREATE JOURNAL * 657 *******************************/ 658 659:- dynamic 660 blocked_db/2, % DB, Reason 661 transaction_message/3, % Nesting, Time, Message 662 transaction_db/3. % Nesting, DB, Id
false
kills the persistent state. Switching to true
creates it.669rdf_persistency(DB, Bool) :- 670 must_be(atom, DB), 671 must_be(boolean, Bool), 672 fail. 673rdf_persistency(DB, false) :- 674 !, 675 ( blocked_db(DB, persistency) 676 -> true 677 ; assert(blocked_db(DB, persistency)), 678 delete_db(DB) 679 ). 680rdf_persistency(DB, true) :- 681 ( retract(blocked_db(DB, persistency)) 682 -> create_db(DB) 683 ; true 684 ).
690:- multifile 691 rdf_db:property_of_graph/2. 692 693rdf_dbproperty_of_graph(persistent(State), Graph) :- 694 ( blocked_db(Graph, persistency) 695 -> State = false 696 ; State = true 697 ).
706start_monitor :- 707 rdf_monitor(monitor, 708 [ -assert(load) 709 ]). 710stop_monitor :- 711 rdf_monitor(monitor, 712 [ -all 713 ]).
rdf_db.pl
that deal with
database changes are serialized. They do come from different
threads though.722monitor(Msg) :- 723 debug(monitor, 'Monitor: ~p~n', [Msg]), 724 fail. 725monitor(assert(S,P,O,DB:Line)) :- 726 !, 727 \+ blocked_db(DB, _), 728 journal_fd(DB, Fd), 729 open_transaction(DB, Fd), 730 format(Fd, '~q.~n', [assert(S,P,O,Line)]), 731 sync_journal(DB, Fd). 732monitor(assert(S,P,O,DB)) :- 733 \+ blocked_db(DB, _), 734 journal_fd(DB, Fd), 735 open_transaction(DB, Fd), 736 format(Fd, '~q.~n', [assert(S,P,O)]), 737 sync_journal(DB, Fd). 738monitor(retract(S,P,O,DB:Line)) :- 739 !, 740 \+ blocked_db(DB, _), 741 journal_fd(DB, Fd), 742 open_transaction(DB, Fd), 743 format(Fd, '~q.~n', [retract(S,P,O,Line)]), 744 sync_journal(DB, Fd). 745monitor(retract(S,P,O,DB)) :- 746 \+ blocked_db(DB, _), 747 journal_fd(DB, Fd), 748 open_transaction(DB, Fd), 749 format(Fd, '~q.~n', [retract(S,P,O)]), 750 sync_journal(DB, Fd). 751monitor(update(S,P,O,DB:Line,Action)) :- 752 !, 753 \+ blocked_db(DB, _), 754 ( Action = graph(NewDB) 755 -> monitor(assert(S,P,O,NewDB)), 756 monitor(retract(S,P,O,DB:Line)) 757 ; journal_fd(DB, Fd), 758 format(Fd, '~q.~n', [update(S,P,O,Action)]), 759 sync_journal(DB, Fd) 760 ). 761monitor(update(S,P,O,DB,Action)) :- 762 \+ blocked_db(DB, _), 763 ( Action = graph(NewDB) 764 -> monitor(assert(S,P,O,NewDB)), 765 monitor(retract(S,P,O,DB)) 766 ; journal_fd(DB, Fd), 767 open_transaction(DB, Fd), 768 format(Fd, '~q.~n', [update(S,P,O,Action)]), 769 sync_journal(DB, Fd) 770 ). 771monitor(load(BE, _DumpFileURI)) :- 772 ( BE = end(Graphs) 773 -> sync_loaded_graphs(Graphs) 774 ; true 775 ). 776monitor(create_graph(Graph)) :- 777 \+ blocked_db(Graph, _), 778 journal_fd(Graph, Fd), 779 open_transaction(Graph, Fd), 780 sync_journal(Graph, Fd). 781monitor(reset) :- 782 forall(rdf_graph(Graph), delete_db(Graph)). 783 % TBD: Remove empty directories? 784 785monitor(transaction(BE, Id)) :- 786 monitor_transaction(Id, BE). 787 788monitor_transaction(load_journal(DB), begin(_)) :- 789 !, 790 assert(blocked_db(DB, journal)). 791monitor_transaction(load_journal(DB), end(_)) :- 792 !, 793 retractall(blocked_db(DB, journal)). 794 795monitor_transaction(parse(URI), begin(_)) :- 796 !, 797 ( blocked_db(URI, persistency) 798 -> true 799 ; assert(blocked_db(URI, parse)) 800 ). 801monitor_transaction(parse(URI), end(_)) :- 802 !, 803 ( retract(blocked_db(URI, parse)) 804 -> create_db(URI) 805 ; true 806 ). 807monitor_transaction(unload(DB), begin(_)) :- 808 !, 809 ( blocked_db(DB, persistency) 810 -> true 811 ; assert(blocked_db(DB, unload)) 812 ). 813monitor_transaction(unload(DB), end(_)) :- 814 !, 815 ( retract(blocked_db(DB, unload)) 816 -> delete_db(DB) 817 ; true 818 ). 819monitor_transaction(log(Msg), begin(N)) :- 820 !, 821 check_nested(N), 822 get_time(Time), 823 asserta(transaction_message(N, Time, Msg)). 824monitor_transaction(log(_), end(N)) :- 825 check_nested(N), 826 retract(transaction_message(N, _, _)), 827 !, 828 findall(DB:Id, retract(transaction_db(N, DB, Id)), DBs), 829 end_transactions(DBs, N). 830monitor_transaction(log(Msg, DB), begin(N)) :- 831 !, 832 check_nested(N), 833 get_time(Time), 834 asserta(transaction_message(N, Time, Msg)), 835 journal_fd(DB, Fd), 836 open_transaction(DB, Fd). 837monitor_transaction(log(Msg, _DB), end(N)) :- 838 monitor_transaction(log(Msg), end(N)).
log_nested_transactions(true)
is defined.847check_nested(0) :- !. 848check_nested(_) :- 849 rdf_option(log_nested_transactions(true)).
begin(Id, Level, Time, Message)
term if a transaction
involves DB. Id is an incremental integer, where each database
has its own counter. Level is the nesting level, Time a floating
point timestamp and Message te message provided as argument to
the log message.860open_transaction(DB, Fd) :- 861 transaction_message(N, Time, Msg), 862 !, 863 ( transaction_db(N, DB, _) 864 -> true 865 ; next_transaction_id(DB, Id), 866 assert(transaction_db(N, DB, Id)), 867 RoundedTime is round(Time*100)/100, 868 format(Fd, '~q.~n', [begin(Id, N, RoundedTime, Msg)]) 869 ). 870open_transaction(_,_).
881:- dynamic 882 current_transaction_id/2. 883 884next_transaction_id(DB, Id) :- 885 retract(current_transaction_id(DB, Last)), 886 !, 887 Id is Last + 1, 888 assert(current_transaction_id(DB, Id)). 889next_transaction_id(DB, Id) :- 890 db_files(DB, _, Journal), 891 exists_file(Journal), 892 !, 893 size_file(Journal, Size), 894 open_db(Journal, read, In, []), 895 call_cleanup(iterative_expand(In, Size, Last), close(In)), 896 Id is Last + 1, 897 assert(current_transaction_id(DB, Id)). 898next_transaction_id(DB, 1) :- 899 assert(current_transaction_id(DB, 1)). 900 901iterative_expand(_, 0, 0) :- !. 902iterative_expand(In, Size, Last) :- % Scan growing sections from the end 903 Max is floor(log(Size)/log(2)), 904 between(10, Max, Step), 905 Offset is -(1<<Step), 906 seek(In, Offset, eof, _), 907 skip(In, 10), % records are line-based 908 read(In, T0), 909 last_transaction_id(T0, In, 0, Last), 910 Last > 0, 911 !. 912iterative_expand(In, _, Last) :- % Scan the whole file 913 seek(In, 0, bof, _), 914 read(In, T0), 915 last_transaction_id(T0, In, 0, Last). 916 917last_transaction_id(end_of_file, _, Last, Last) :- !. 918last_transaction_id(end(Id, _, _), In, _, Last) :- 919 read(In, T1), 920 last_transaction_id(T1, In, Id, Last). 921last_transaction_id(_, In, Id, Last) :- 922 read(In, T1), 923 last_transaction_id(T1, In, Id, Last).
In each database, the transaction is ended with a term end(Id,
Nesting, Others)
, where Id and Nesting are the transaction
identifier and nesting (see open_transaction/2) and Others is a
list of DB:Id, indicating other databases affected by the
transaction.
938end_transactions(DBs, N) :- 939 end_transactions(DBs, DBs, N). 940 941end_transactions([], _, _). 942end_transactions([DB:Id|T], DBs, N) :- 943 journal_fd(DB, Fd), 944 once(select(DB:Id, DBs, Others)), 945 format(Fd, 'end(~q, ~q, ~q).~n', [Id, N, Others]), 946 sync_journal(DB, Fd), 947 end_transactions(T, DBs, N).
955sync_loaded_graphs(Graphs) :- 956 maplist(create_db, Graphs). 957 958 959 /******************************* 960 * JOURNAL FILES * 961 *******************************/
max_open_journals
option.
Then the journal is opened in append
mode. Journal files are
always encoded as UTF-8 for portability as well as to ensure
full coverage of Unicode.971journal_fd(DB, Fd) :- 972 source_journal_fd(DB, Fd), 973 !. 974journal_fd(DB, Fd) :- 975 with_mutex(rdf_journal_file, 976 journal_fd_(DB, Out)), 977 Fd = Out. 978 979journal_fd_(DB, Fd) :- 980 source_journal_fd(DB, Fd), 981 !. 982journal_fd_(DB, Fd) :- 983 limit_fd_pool, 984 db_files(DB, _Snapshot, Journal), 985 open_db(Journal, append, Fd, 986 [ close_on_abort(false) 987 ]), 988 time_stamp(Now), 989 format(Fd, '~q.~n', [start([time(Now)])]), 990 assert(source_journal_fd(DB, Fd)). % new one at the end
999limit_fd_pool :- 1000 predicate_property(source_journal_fd(_, _), number_of_clauses(N)), 1001 !, 1002 ( rdf_option(max_open_journals(Max)) 1003 -> true 1004 ; Max = 10 1005 ), 1006 Close is N - Max, 1007 forall(between(1, Close, _), 1008 close_oldest_journal). 1009limit_fd_pool. 1010 1011close_oldest_journal :- 1012 source_journal_fd(DB, _Fd), 1013 !, 1014 debug(rdf_persistency, 'Closing old journal for ~q', [DB]), 1015 close_journal(DB). 1016close_oldest_journal.
1025sync_journal(DB, _) :- 1026 transaction_db(_, DB, _), 1027 !. 1028sync_journal(_, Fd) :- 1029 flush_output(Fd).
1035close_journal(DB) :- 1036 with_mutex(rdf_journal_file, 1037 close_journal_(DB)). 1038 1039close_journal_(DB) :- 1040 ( retract(source_journal_fd(DB, Fd)) 1041 -> time_stamp(Now), 1042 format(Fd, '~q.~n', [end([time(Now)])]), 1043 close(Fd, [force(true)]) 1044 ; true 1045 ).
1051close_journals :-
1052 forall(source_journal_fd(DB, _),
1053 catch(close_journal(DB), E,
1054 print_message(error, E))).
1061create_db(Graph) :- 1062 \+ rdf(_,_,_,Graph), 1063 !, 1064 debug(rdf_persistency, 'Deleting empty Graph ~w', [Graph]), 1065 delete_db(Graph). 1066create_db(Graph) :- 1067 debug(rdf_persistency, 'Saving Graph ~w', [Graph]), 1068 close_journal(Graph), 1069 db_abs_files(Graph, Snapshot, Journal), 1070 atom_concat(Snapshot, '.new', NewSnapshot), 1071 ( catch(( create_directory_levels(Snapshot), 1072 rdf_save_db(NewSnapshot, Graph) 1073 ), Error, 1074 ( print_message(warning, Error), 1075 fail 1076 )) 1077 -> ( exists_file(Journal) 1078 -> delete_file(Journal) 1079 ; true 1080 ), 1081 rename_file(NewSnapshot, Snapshot), 1082 debug(rdf_persistency, 'Saved Graph ~w', [Graph]) 1083 ; catch(delete_file(NewSnapshot), _, true) 1084 ).
1091delete_db(DB) :- 1092 with_mutex(rdf_journal_file, 1093 delete_db_(DB)). 1094 1095delete_db_(DB) :- 1096 close_journal_(DB), 1097 db_abs_files(DB, Snapshot, Journal), 1098 !, 1099 ( exists_file(Journal) 1100 -> delete_file(Journal) 1101 ; true 1102 ), 1103 ( exists_file(Snapshot) 1104 -> delete_file(Snapshot) 1105 ; true 1106 ). 1107delete_db_(_). 1108 1109 /******************************* 1110 * LOCKING * 1111 *******************************/
1117lock_db(Dir) :- 1118 lockfile(Dir, File), 1119 catch(open(File, update, Out, [lock(write), wait(false)]), 1120 error(permission_error(Access, _, _), _), 1121 locked_error(Access, Dir)), 1122 ( current_prolog_flag(pid, PID) 1123 -> true 1124 ; PID = 0 % TBD: Fix in Prolog 1125 ), 1126 time_stamp(Now), 1127 gethostname(Host), 1128 format(Out, '/* RDF Database is in use */~n~n', []), 1129 format(Out, '~q.~n', [ locked([ time(Now), 1130 pid(PID), 1131 host(Host) 1132 ]) 1133 ]), 1134 flush_output(Out), 1135 set_end_of_stream(Out), 1136 assert(rdf_lock(Dir, lock(Out, File))), 1137 at_halt(unlock_db(Dir)). 1138 1139locked_error(lock, Dir) :- 1140 lockfile(Dir, File), 1141 ( catch(read_file_to_terms(File, Terms, []), _, fail), 1142 Terms = [locked(Args)] 1143 -> Context = rdf_locked(Args) 1144 ; Context = context(_, 'Database is in use') 1145 ), 1146 throw(error(permission_error(lock, rdf_db, Dir), Context)). 1147locked_error(open, Dir) :- 1148 throw(error(permission_error(lock, rdf_db, Dir), 1149 context(_, 'Lock file cannot be opened'))).
1154unlock_db(Dir) :- 1155 retract(rdf_lock(Dir, lock(Out, File))), 1156 !, 1157 unlock_db(Out, File). 1158unlock_db(_). 1159 1160unlock_db(Out, File) :- 1161 close(Out), 1162 delete_file(File). 1163 1164 /******************************* 1165 * FILENAMES * 1166 *******************************/ 1167 1168lockfile(Dir, LockFile) :- 1169 atomic_list_concat([Dir, /, lock], LockFile). 1170 1171directory_levels(Levels) :- 1172 rdf_option(directory_levels(Levels)), 1173 !. 1174directory_levels(2). 1175 1176db_file(Base, File) :- 1177 rdf_directory(Dir), 1178 directory_levels(Levels), 1179 db_file(Dir, Base, Levels, File). 1180 1181db_file(Dir, Base, Levels, File) :- 1182 dir_levels(Base, Levels, Segments, [Base]), 1183 atomic_list_concat([Dir|Segments], /, File). 1184 1185open_db(Base, Mode, Stream, Options) :- 1186 db_file(Base, File), 1187 create_directory_levels(File), 1188 open(File, Mode, Stream, [encoding(utf8)|Options]). 1189 1190create_directory_levels(_File) :- 1191 rdf_option(directory_levels(0)), 1192 !. 1193create_directory_levels(File) :- 1194 file_directory_name(File, Dir), 1195 make_directory_path(Dir). 1196 1197exists_db(Base) :- 1198 db_file(Base, File), 1199 exists_file(File).
1206dir_levels(_, 0, Segments, Segments) :- !. 1207dir_levels(File, Levels, Segments, Tail) :- 1208 rdf_atom_md5(File, 1, Hash), 1209 create_dir_levels(Levels, 0, Hash, Segments, Tail). 1210 1211create_dir_levels(0, _, _, Segments, Segments) :- !. 1212create_dir_levels(N, S, Hash, [S1|Segments0], Tail) :- 1213 sub_atom(Hash, S, 2, _, S1), 1214 S2 is S+2, 1215 N2 is N-1, 1216 create_dir_levels(N2, S2, Hash, Segments0, Tail).
1227db_files(DB, Snapshot, Journal) :- 1228 nonvar(DB), 1229 !, 1230 rdf_db_to_file(DB, Base), 1231 atom_concat(Base, '.trp', Snapshot), 1232 atom_concat(Base, '.jrn', Journal). 1233db_files(DB, Snapshot, Journal) :- 1234 nonvar(Snapshot), 1235 !, 1236 atom_concat(Base, '.trp', Snapshot), 1237 atom_concat(Base, '.jrn', Journal), 1238 rdf_db_to_file(DB, Base). 1239db_files(DB, Snapshot, Journal) :- 1240 nonvar(Journal), 1241 !, 1242 atom_concat(Base, '.jrn', Journal), 1243 atom_concat(Base, '.trp', Snapshot), 1244 rdf_db_to_file(DB, Base). 1245 1246db_abs_files(DB, Snapshot, Journal) :- 1247 db_files(DB, Snapshot0, Journal0), 1248 db_file(Snapshot0, Snapshot), 1249 db_file(Journal0, Journal).
1257rdf_journal_file(Graph, Journal) :-
1258 ( var(Graph)
1259 -> rdf_graph(Graph)
1260 ; true
1261 ),
1262 db_abs_files(Graph, _Snapshot, Journal),
1263 exists_file(Journal).
1271rdf_snapshot_file(Graph, Snapshot) :-
1272 ( var(Graph)
1273 -> rdf_graph(Graph) % also pick the empty graphs
1274 ; true
1275 ),
1276 db_abs_files(Graph, Snapshot, _Journal),
1277 exists_file(Snapshot).
1289rdf_db_to_file(DB, File) :- 1290 file_base_db(File, DB), 1291 !. 1292rdf_db_to_file(DB, File) :- 1293 url_to_filename(DB, File), 1294 assert(file_base_db(File, DB)).
1307url_to_filename(URL, FileName) :- 1308 atomic(URL), 1309 !, 1310 atom_codes(URL, Codes), 1311 phrase(url_encode(EncCodes), Codes), 1312 atom_codes(FileName, EncCodes). 1313url_to_filename(URL, FileName) :- 1314 uri_encoded(path, URL, FileName). 1315 1316url_encode([0'+|T]) --> 1317 " ", 1318 !, 1319 url_encode(T). 1320url_encode([C|T]) --> 1321 alphanum(C), 1322 !, 1323 url_encode(T). 1324url_encode([C|T]) --> 1325 no_enc_extra(C), 1326 !, 1327 url_encode(T). 1328url_encode(Enc) --> 1329 ( "\r\n" 1330 ; "\n" 1331 ), 1332 !, 1333 { string_codes("%0D%0A", Codes), 1334 append(Codes, T, Enc) 1335 }, 1336 url_encode(T). 1337url_encode([]) --> 1338 eos, 1339 !. 1340url_encode([0'%,D1,D2|T]) --> 1341 [C], 1342 { Dv1 is (C>>4 /\ 0xf), 1343 Dv2 is (C /\ 0xf), 1344 code_type(D1, xdigit(Dv1)), 1345 code_type(D2, xdigit(Dv2)) 1346 }, 1347 url_encode(T). 1348 1349eos([], []). 1350 1351alphanum(C) --> 1352 [C], 1353 { C < 128, % US-ASCII 1354 code_type(C, alnum) 1355 }. 1356 1357no_enc_extra(0'_) --> "_". 1358 1359 1360 /******************************* 1361 * REINDEX * 1362 *******************************/
1368reindex_db(Dir, Levels) :- 1369 directory_files(Dir, Files), 1370 reindex_files(Files, Dir, '.', 0, Levels), 1371 remove_empty_directories(Files, Dir). 1372 1373reindex_files([], _, _, _, _). 1374reindex_files([Nofollow|Files], Dir, Prefix, CLevel, Levels) :- 1375 nofollow(Nofollow), 1376 !, 1377 reindex_files(Files, Dir, Prefix, CLevel, Levels). 1378reindex_files([File|Files], Dir, Prefix, CLevel, Levels) :- 1379 CLevel \== Levels, 1380 file_name_extension(_Base, Ext, File), 1381 db_extension(Ext), 1382 !, 1383 directory_file_path(Prefix, File, DBFile), 1384 directory_file_path(Dir, DBFile, OldPath), 1385 db_file(Dir, File, Levels, NewPath), 1386 debug(rdf_persistency, 'Rename ~q --> ~q', [OldPath, NewPath]), 1387 file_directory_name(NewPath, NewDir), 1388 make_directory_path(NewDir), 1389 rename_file(OldPath, NewPath), 1390 reindex_files(Files, Dir, Prefix, CLevel, Levels). 1391reindex_files([D|Files], Dir, Prefix, CLevel, Levels) :- 1392 directory_file_path(Prefix, D, SubD), 1393 directory_file_path(Dir, SubD, AbsD), 1394 exists_directory(AbsD), 1395 \+ read_link(AbsD, _, _), % Do not follow links 1396 !, 1397 directory_files(AbsD, SubFiles), 1398 CLevel2 is CLevel + 1, 1399 reindex_files(SubFiles, Dir, SubD, CLevel2, Levels), 1400 reindex_files(Files, Dir, Prefix, CLevel, Levels). 1401reindex_files([_|Files], Dir, Prefix, CLevel, Levels) :- 1402 reindex_files(Files, Dir, Prefix, CLevel, Levels). 1403 1404 1405remove_empty_directories([], _). 1406remove_empty_directories([File|Files], Dir) :- 1407 \+ nofollow(File), 1408 directory_file_path(Dir, File, Path), 1409 exists_directory(Path), 1410 \+ read_link(Path, _, _), 1411 !, 1412 directory_files(Path, Content), 1413 exclude(nofollow, Content, RealContent), 1414 ( RealContent == [] 1415 -> debug(rdf_persistency, 'Remove empty dir ~q', [Path]), 1416 delete_directory(Path) 1417 ; remove_empty_directories(RealContent, Path) 1418 ), 1419 remove_empty_directories(Files, Dir). 1420remove_empty_directories([_|Files], Dir) :- 1421 remove_empty_directories(Files, Dir). 1422 1423 1424 /******************************* 1425 * PREFIXES * 1426 *******************************/ 1427 1428save_prefixes(Dir) :- 1429 atomic_list_concat([Dir, /, 'prefixes.db'], PrefixFile), 1430 setup_call_cleanup(open(PrefixFile, write, Out, [encoding(utf8)]), 1431 write_prefixes(Out), 1432 close(Out)). 1433 1434write_prefixes(Out) :- 1435 format(Out, '% Snapshot of defined RDF prefixes~n~n', []), 1436 forall(rdf_current_ns(Alias, URI), 1437 format(Out, 'prefix(~q, ~q).~n', [Alias, URI])).
1447load_prefixes(Dir) :- 1448 atomic_list_concat([Dir, /, 'prefixes.db'], PrefixFile), 1449 ( exists_file(PrefixFile) 1450 -> setup_call_cleanup(open(PrefixFile, read, In, [encoding(utf8)]), 1451 read_prefixes(In), 1452 close(In)) 1453 ; true 1454 ). 1455 1456read_prefixes(Stream) :- 1457 read_term(Stream, T0, []), 1458 read_prefixes(T0, Stream). 1459 1460read_prefixes(end_of_file, _) :- !. 1461read_prefixes(prefix(Alias, URI), Stream) :- 1462 !, 1463 must_be(atom, Alias), 1464 must_be(atom, URI), 1465 catch(rdf_register_ns(Alias, URI, []), E, 1466 print_message(warning, E)), 1467 read_term(Stream, T, []), 1468 read_prefixes(T, Stream). 1469read_prefixes(Term, _) :- 1470 domain_error(prefix_term, Term). 1471 1472 1473 /******************************* 1474 * UTIL * 1475 *******************************/
1481mkdir(Directory) :- 1482 exists_directory(Directory), 1483 !. 1484mkdir(Directory) :- 1485 make_directory(Directory).
1491time_stamp(Int) :- 1492 get_time(Now), 1493 Int is round(Now). 1494 1495 1496 /******************************* 1497 * MESSAGES * 1498 *******************************/ 1499 1500:- multifile 1501 prolog:message/3, 1502 prolog:message_context/3. 1503 1504prologmessage(rdf(Term)) --> 1505 message(Term). 1506 1507message(restoring(Type, Count, Jobs)) --> 1508 [ 'Restoring ~D ~w using ~D concurrent workers'-[Count, Type, Jobs] ]. 1509message(restore(attached(Graphs, Triples, Time/Wall))) --> 1510 { catch(Percent is round(100*Time/Wall), _, Percent = 0) }, 1511 [ 'Loaded ~D graphs (~D triples) in ~2f sec. (~d% CPU = ~2f sec.)'- 1512 [Graphs, Triples, Wall, Percent, Time] ]. 1513% attach_graph/2 1514message(restore(true, Action)) --> 1515 !, 1516 silent_message(Action). 1517message(restore(brief, Action)) --> 1518 !, 1519 brief_message(Action). 1520message(restore(_, Graph)) --> 1521 [ 'Restoring ~p ... '-[Graph], flush ]. 1522message(restore(_, snapshot(_))) --> 1523 [ at_same_line, '(snapshot) '-[], flush ]. 1524message(restore(_, journal(_))) --> 1525 [ at_same_line, '(journal) '-[], flush ]. 1526message(restore(_, done(_, Time, Count))) --> 1527 [ at_same_line, '~D triples in ~2f sec.'-[Count, Time] ]. 1528% load_source/4 1529message(restore(_, snapshot(G, _))) --> 1530 [ 'Restoring ~p\t(snapshot)'-[G], flush ]. 1531message(restore(_, journal(G, _))) --> 1532 [ 'Restoring ~p\t(journal)'-[G], flush ]. 1533message(restore(_, done(_, Time, Count))) --> 1534 [ at_same_line, '~D triples in ~2f sec.'-[Count, Time] ]. 1535% journal handling 1536message(update_failed(S,P,O,Action)) --> 1537 [ 'Failed to update <~p ~p ~p> with ~p'-[S,P,O,Action] ]. 1538% directory reindexing 1539message(reindex(Count, Depth)) --> 1540 [ 'Restructuring database with ~d levels (~D graphs)'-[Depth, Count] ]. 1541message(reindex(Depth)) --> 1542 [ 'Fixing database directory structure (~d levels)'-[Depth] ]. 1543message(read_only) --> 1544 [ 'Cannot write persistent store; continuing in read-only mode.', nl, 1545 'All changes to the RDF store will be lost if this process terminates.' 1546 ]. 1547 1548silent_message(_Action) --> []. 1549 1550brief_message(done(Graph, _Time, _Count, Nth, Total)) --> 1551 { file_base_name(Graph, Base) }, 1552 [ at_same_line, 1553 '\r~p~`.t ~D of ~D graphs~72|'-[Base, Nth, Total], 1554 flush 1555 ]. 1556brief_message(_) --> []. 1557 1558 1559prologmessage_context(rdf_locked(Args)) --> 1560 { memberchk(time(Time), Args), 1561 memberchk(pid(Pid), Args), 1562 format_time(string(S), '%+', Time) 1563 }, 1564 [ nl, 1565 'locked at ~s by process id ~w'-[S,Pid] 1566 ]
RDF persistency plugin
This module provides persistency for
rdf_db.pl
based on the rdf_monitor/2 predicate to track changes to the repository. Where previous versions used autosafe of the whole database using the quick-load format of rdf_db, this version is based on a quick-load file per source (4th argument of rdf/4), and journalling for edit operations.The result is safe, avoids frequent small changes to large files which makes synchronisation and backup expensive and avoids long disruption of the server doing the autosafe. Only loading large files disrupts service for some time.
The persistent backup of the database is realised in a directory, using a lock file to avoid corruption due to concurrent access. Each source is represented by two files, the latest snapshot and a journal. The state is restored by loading the snapshot and replaying the journal. The predicate rdf_flush_journals/1 can be used to create fresh snapshots and delete the journals.
rdf_edit.pl
*/