Difference between revisions of "PyLoDStorage"
Jump to navigation
Jump to search
Line 99: | Line 99: | ||
-k | --kill: kill the running fuseki server | -k | --kill: kill the running fuseki server | ||
-l | --load [ttl file]: download jena / tdbloader and load given ttl file | -l | --load [ttl file]: download jena / tdbloader and load given ttl file | ||
+ | </source> | ||
+ | = Example Unit tests = | ||
+ | == SQLDB unit test == | ||
+ | <source lang='python'> | ||
+ | ''' | ||
+ | Created on 2020-08-24 | ||
+ | |||
+ | @author: wf | ||
+ | ''' | ||
+ | import unittest | ||
+ | from datetime import datetime | ||
+ | import time | ||
+ | import os | ||
+ | import sys | ||
+ | from lodstorage.sample import Sample | ||
+ | from lodstorage.uml import UML | ||
+ | from lodstorage.sql import SQLDB, EntityInfo | ||
+ | |||
+ | |||
+ | class TestSQLDB(unittest.TestCase): | ||
+ | ''' | ||
+ | Test the SQLDB database wrapper | ||
+ | ''' | ||
+ | |||
+ | def setUp(self): | ||
+ | self.debug=True | ||
+ | pass | ||
+ | |||
+ | def tearDown(self): | ||
+ | pass | ||
+ | |||
+ | def checkListOfRecords(self,listOfRecords,entityName,primaryKey=None,executeMany=True,fixDates=False,debug=False,doClose=True): | ||
+ | ''' | ||
+ | check the handling of the given list of Records | ||
+ | |||
+ | Args: | ||
+ | |||
+ | listOfRecords(list): a list of dicts that contain the data to be stored | ||
+ | entityName(string): the name of the entity type to be used as a table name | ||
+ | primaryKey(string): the name of the key / column to be used as a primary key | ||
+ | executeMany(boolean): True if executeMany mode of sqlite3 should be used | ||
+ | debug(boolean): True if debug information e.g. CREATE TABLE and INSERT INTO commands should be shown | ||
+ | doClose(boolean): True if the connection should be closed | ||
+ | |||
+ | ''' | ||
+ | size=len(listOfRecords) | ||
+ | print("%s size is %d fixDates is: %r" % (entityName,size,fixDates)) | ||
+ | self.sqlDB=SQLDB(debug=debug,errorDebug=True) | ||
+ | entityInfo=self.sqlDB.createTable(listOfRecords[:10],entityName,primaryKey) | ||
+ | startTime=time.time() | ||
+ | self.sqlDB.store(listOfRecords,entityInfo,executeMany=executeMany) | ||
+ | elapsed=time.time()-startTime | ||
+ | print ("adding %d %s records took %5.3f s => %5.f records/s" % (size,entityName,elapsed,size/elapsed)) | ||
+ | resultList=self.sqlDB.queryAll(entityInfo,fixDates=fixDates) | ||
+ | print ("selecting %d %s records took %5.3f s => %5.f records/s" % (len(resultList),entityName,elapsed,len(resultList)/elapsed)) | ||
+ | if doClose: | ||
+ | self.sqlDB.close() | ||
+ | return resultList | ||
+ | |||
+ | def testEntityInfo(self): | ||
+ | ''' | ||
+ | test creating entityInfo from the sample record | ||
+ | ''' | ||
+ | listOfRecords=Sample.getRoyals() | ||
+ | entityInfo=EntityInfo(listOfRecords[:3],'Person','name',debug=True) | ||
+ | self.assertEqual("CREATE TABLE Person(name TEXT PRIMARY KEY,born DATE,numberInLine INTEGER,wikidataurl TEXT,age FLOAT,ofAge BOOLEAN,lastmodified TIMESTAMP)",entityInfo.createTableCmd) | ||
+ | self.assertEqual("INSERT INTO Person (name,born,numberInLine,wikidataurl,age,ofAge,lastmodified) values (:name,:born,:numberInLine,:wikidataurl,:age,:ofAge,:lastmodified)",entityInfo.insertCmd) | ||
+ | self.sqlDB=SQLDB(debug=self.debug,errorDebug=True) | ||
+ | entityInfo=self.sqlDB.createTable(listOfRecords[:10],entityInfo.name,entityInfo.primaryKey) | ||
+ | tableList=self.sqlDB.getTableList() | ||
+ | if self.debug: | ||
+ | print (tableList) | ||
+ | self.assertEqual(1,len(tableList)) | ||
+ | personTable=tableList[0] | ||
+ | self.assertEqual("Person",personTable['name']) | ||
+ | self.assertEqual(7,len(personTable['columns'])) | ||
+ | uml=UML() | ||
+ | plantUml=uml.tableListToPlantUml(tableList,packageName="Royals",withSkin=False) | ||
+ | if self.debug: | ||
+ | print(plantUml) | ||
+ | expected="""package Royals { | ||
+ | class Person << Entity >> { | ||
+ | age : FLOAT | ||
+ | born : DATE | ||
+ | lastmodified : TIMESTAMP | ||
+ | name : TEXT <<PK>> | ||
+ | numberInLine : INTEGER | ||
+ | ofAge : BOOLEAN | ||
+ | wikidataurl : TEXT | ||
+ | } | ||
+ | } | ||
+ | """ | ||
+ | self.assertEqual(expected,plantUml) | ||
+ | |||
+ | # testGeneralization | ||
+ | listOfRecords=[{'name': 'Royal family', 'country': 'UK', 'lastmodified':datetime.now()}] | ||
+ | entityInfo=self.sqlDB.createTable(listOfRecords[:10],'Family','name') | ||
+ | tableList=self.sqlDB.getTableList() | ||
+ | self.assertEqual(2,len(tableList)) | ||
+ | uml=UML() | ||
+ | plantUml=uml.tableListToPlantUml(tableList,generalizeTo="PersonBase",withSkin=False) | ||
+ | print(plantUml) | ||
+ | expected='''class PersonBase << Entity >> { | ||
+ | lastmodified : TIMESTAMP | ||
+ | name : TEXT <<PK>> | ||
+ | } | ||
+ | class Person << Entity >> { | ||
+ | age : FLOAT | ||
+ | born : DATE | ||
+ | numberInLine : INTEGER | ||
+ | ofAge : BOOLEAN | ||
+ | wikidataurl : TEXT | ||
+ | } | ||
+ | class Family << Entity >> { | ||
+ | country : TEXT | ||
+ | } | ||
+ | PersonBase <|-- Person | ||
+ | PersonBase <|-- Family | ||
+ | ''' | ||
+ | self.assertEqual(expected,plantUml) | ||
+ | |||
+ | def testUniqueConstraint(self): | ||
+ | ''' | ||
+ | test for https://github.com/WolfgangFahl/pyLoDStorage/issues/4 | ||
+ | sqlite3.IntegrityError: UNIQUE constraint failed: ... show debug info | ||
+ | ''' | ||
+ | listOfDicts=[ | ||
+ | {"name": "John Doe"}, | ||
+ | {"name": "Frank Doe"}, | ||
+ | {"name": "John Doe"}, | ||
+ | {"name":"Tim Doe"}] | ||
+ | sqlDB=SQLDB(debug=self.debug,errorDebug=True) | ||
+ | entityInfo=sqlDB.createTable(listOfDicts[:10],'Does','name') | ||
+ | try: | ||
+ | sqlDB.store(listOfDicts,entityInfo,executeMany=False) | ||
+ | self.fail("There should be an exception") | ||
+ | except Exception as ex: | ||
+ | expected="""INSERT INTO Does (name) values (:name) | ||
+ | failed:UNIQUE constraint failed: Does.name | ||
+ | record #3={'name': 'John Doe'}""" | ||
+ | errMsg=str(ex) | ||
+ | self.assertEqual(expected,errMsg) | ||
+ | |||
+ | def testSqlite3(self): | ||
+ | ''' | ||
+ | test sqlite3 with a few records from the royal family | ||
+ | ''' | ||
+ | listOfRecords=Sample.getRoyals() | ||
+ | resultList=self.checkListOfRecords(listOfRecords, 'Person', 'name',debug=True) | ||
+ | if self.debug: | ||
+ | print(resultList) | ||
+ | self.assertEqual(listOfRecords,resultList) | ||
+ | |||
+ | def testBindingError(self): | ||
+ | ''' | ||
+ | test list of Records with incomplete record leading to | ||
+ | "You did not supply a value for binding 2" | ||
+ | see https://bugs.python.org/issue41638 | ||
+ | ''' | ||
+ | listOfRecords=[{'name':'Pikachu', 'type':'Electric'},{'name':'Raichu' }] | ||
+ | for executeMany in [True,False]: | ||
+ | try: | ||
+ | self.checkListOfRecords(listOfRecords,'Pokemon','name',executeMany=executeMany) | ||
+ | self.fail("There should be an exception") | ||
+ | except Exception as ex: | ||
+ | if self.debug: | ||
+ | print(str(ex)) | ||
+ | self.assertTrue('no value supplied for column' in str(ex)) | ||
+ | |||
+ | def testListOfCities(self): | ||
+ | ''' | ||
+ | test sqlite3 with some 120000 city records | ||
+ | ''' | ||
+ | listOfRecords=Sample.getCities() | ||
+ | for fixDates in [True,False]: | ||
+ | retrievedList=self.checkListOfRecords(listOfRecords,'City',fixDates=fixDates) | ||
+ | self.assertEqual(len(listOfRecords),len(retrievedList)) | ||
+ | |||
+ | def testQueryParams(self): | ||
+ | ''' | ||
+ | test Query Params | ||
+ | ''' | ||
+ | listOfDicts=[ | ||
+ | {"city": "New York", "country": "US"}, | ||
+ | {"city": "Amsterdam", "country": "NL"}, | ||
+ | {"city": "Paris", "country": "FR"}] | ||
+ | sqlDB=SQLDB(debug=self.debug,errorDebug=True) | ||
+ | entityInfo=sqlDB.createTable(listOfDicts[:10],'cities','city') | ||
+ | sqlDB.store(listOfDicts,entityInfo,executeMany=False) | ||
+ | query="SELECT * from cities WHERE country in (?)" | ||
+ | params=('FR',) | ||
+ | frCities=sqlDB.query(query,params) | ||
+ | if self.debug: | ||
+ | print (frCities); | ||
+ | self.assertEqual([{'city': 'Paris', 'country': 'FR'}],frCities) | ||
+ | |||
+ | def testSqllite3Speed(self): | ||
+ | ''' | ||
+ | test sqlite3 speed with some 100000 artificial sample records | ||
+ | consisting of two columns with a running index | ||
+ | ''' | ||
+ | limit=100000 | ||
+ | listOfRecords=Sample.getSample(limit) | ||
+ | self.checkListOfRecords(listOfRecords, 'Sample', 'pKey') | ||
+ | |||
+ | def testBackup(self): | ||
+ | ''' | ||
+ | test creating a backup of the SQL database | ||
+ | ''' | ||
+ | if sys.version_info >= (3, 7): | ||
+ | listOfRecords=Sample.getCities() | ||
+ | self.checkListOfRecords(listOfRecords,'City',fixDates=True,doClose=False) | ||
+ | backupDB="/tmp/testSqlite.db" | ||
+ | self.sqlDB.backup(backupDB,profile=True,showProgress=200) | ||
+ | size=os.stat(backupDB).st_size | ||
+ | print ("size of backup DB is %d" % size) | ||
+ | self.assertTrue(size>600000) | ||
+ | self.sqlDB.close() | ||
+ | # restore | ||
+ | ramDB=SQLDB.restore(backupDB, SQLDB.RAM, profile=True) | ||
+ | entityInfo=EntityInfo(listOfRecords[:50],'City',debug=True) | ||
+ | allCities=ramDB.queryAll(entityInfo) | ||
+ | self.assertEqual(len(allCities),len(listOfRecords)) | ||
+ | |||
+ | def testCopy(self): | ||
+ | ''' | ||
+ | test copying databases into another database | ||
+ | ''' | ||
+ | dbFile="/tmp/DAWT_Sample3x1000.db" | ||
+ | copyDB=SQLDB(dbFile) | ||
+ | for sampleNo in range(3): | ||
+ | listOfRecords=Sample.getSample(1000) | ||
+ | self.checkListOfRecords(listOfRecords, 'Sample_%d_1000' %sampleNo, 'pKey',doClose=False) | ||
+ | self.sqlDB.copyTo(copyDB) | ||
+ | size=os.stat(dbFile).st_size | ||
+ | print ("size of copy DB is %d" % size) | ||
+ | self.assertTrue(size>70000) | ||
+ | tableList=copyDB.getTableList() | ||
+ | print(tableList) | ||
+ | for sampleNo in range(3): | ||
+ | self.assertEqual('Sample_%d_1000' %sampleNo,tableList[sampleNo]['name']) | ||
+ | |||
+ | |||
+ | if __name__ == "__main__": | ||
+ | #import sys;sys.argv = ['', 'Test.testSqllit3'] | ||
+ | unittest.main() | ||
</source> | </source> | ||
== SPARQL unit test == | == SPARQL unit test == |
Revision as of 08:04, 22 September 2020
OsProject
OsProject | |
---|---|
edit | |
id | PyLodStorage |
state | |
owner | Wolfgang Fahl |
title | python List of Dict (Table) Storage library |
url | https://github.com/WolfgangFahl/pyLodStorage |
version | 0.0.12 |
description | |
date | 2020/09/21 |
since | |
until |
see also DgraphAndWeaviateTest
List of Dicts = Table
a list of dicts(Hashtables) in python can be interpreted as Table which is suitable to be stored
- in a relational database like sqlite3
- as JSON
Royals example
@staticmethod
def getRoyals():
listOfDicts=[
{'name': 'Elizabeth Alexandra Mary Windsor', 'born': Sample.dob('1926-04-21'), 'numberInLine': 0, 'wikidataurl': 'https://www.wikidata.org/wiki/Q9682' },
{'name': 'Charles, Prince of Wales', 'born': Sample.dob('1948-11-14'), 'numberInLine': 1, 'wikidataurl': 'https://www.wikidata.org/wiki/Q43274' },
{'name': 'George of Cambridge', 'born': Sample.dob('2013-07-22'), 'numberInLine': 3, 'wikidataurl': 'https://www.wikidata.org/wiki/Q1359041'},
{'name': 'Harry Duke of Sussex', 'born': Sample.dob('1984-09-15'), 'numberInLine': 6, 'wikidataurl': 'https://www.wikidata.org/wiki/Q152316'}
]
today=date.today()
for person in listOfDicts:
born=person['born']
age=(today - born).days / 365.2425
person['age']=age
person['ofAge']=age>=18
person['lastmodified']=datetime.now()
return listOfDicts
The above list of dict can be stored in a Person table with the following structure:
SQL
The idea is to derive the necessary DDL and SQL command automatically:
CREATE TABLE Family(name TEXT PRIMARY KEY,country TEXT,lastmodified TIMESTAMP)
INSERT INTO Family (name,country,lastmodified) values (:name,:country,:lastmodified)
and use them via simple API
from lodstorage.sample import Sample
from lodstorage.sql import SQLDB, EntityInfo
listOfRecords=Sample.getRoyals()
sqlDB=SQLDB()
entityName='Person'
primaryKey='name'
entityInfo=self.sqlDB.createTable(listOfRecords[:10],entityName,primaryKey)
sqlDB.store(listOfRecords,entityInfo)
resultList=self.sqlDB.queryAll(entityInfo)
The resultList will be the same as the original listOfRecords.
JSON
Apache Jena
The jena -l and jena -f options will automatically download and unpack the needed Apache jena files.
Jena load example dataset
scripts/jena -l sampledata/example.ttl
Jena fuseki server start
scripts/jena -f example
You should be able to browse the admin GUI at http://localhost:3030 and have the example dataset ready for you
Jena fuseki server stop
scripts/jena -k
jena script usage
scripts/jena -h
scripts/jena [-f|--fuseki|-h|--help|-k|--kill|-l|--load]
-f | --fuseki [dataset]: download and start fuseki server with the given dataset
-h | --help: show this usage
-k | --kill: kill the running fuseki server
-l | --load [ttl file]: download jena / tdbloader and load given ttl file
Example Unit tests
SQLDB unit test
'''
Created on 2020-08-24
@author: wf
'''
import unittest
from datetime import datetime
import time
import os
import sys
from lodstorage.sample import Sample
from lodstorage.uml import UML
from lodstorage.sql import SQLDB, EntityInfo
class TestSQLDB(unittest.TestCase):
'''
Test the SQLDB database wrapper
'''
def setUp(self):
self.debug=True
pass
def tearDown(self):
pass
def checkListOfRecords(self,listOfRecords,entityName,primaryKey=None,executeMany=True,fixDates=False,debug=False,doClose=True):
'''
check the handling of the given list of Records
Args:
listOfRecords(list): a list of dicts that contain the data to be stored
entityName(string): the name of the entity type to be used as a table name
primaryKey(string): the name of the key / column to be used as a primary key
executeMany(boolean): True if executeMany mode of sqlite3 should be used
debug(boolean): True if debug information e.g. CREATE TABLE and INSERT INTO commands should be shown
doClose(boolean): True if the connection should be closed
'''
size=len(listOfRecords)
print("%s size is %d fixDates is: %r" % (entityName,size,fixDates))
self.sqlDB=SQLDB(debug=debug,errorDebug=True)
entityInfo=self.sqlDB.createTable(listOfRecords[:10],entityName,primaryKey)
startTime=time.time()
self.sqlDB.store(listOfRecords,entityInfo,executeMany=executeMany)
elapsed=time.time()-startTime
print ("adding %d %s records took %5.3f s => %5.f records/s" % (size,entityName,elapsed,size/elapsed))
resultList=self.sqlDB.queryAll(entityInfo,fixDates=fixDates)
print ("selecting %d %s records took %5.3f s => %5.f records/s" % (len(resultList),entityName,elapsed,len(resultList)/elapsed))
if doClose:
self.sqlDB.close()
return resultList
def testEntityInfo(self):
'''
test creating entityInfo from the sample record
'''
listOfRecords=Sample.getRoyals()
entityInfo=EntityInfo(listOfRecords[:3],'Person','name',debug=True)
self.assertEqual("CREATE TABLE Person(name TEXT PRIMARY KEY,born DATE,numberInLine INTEGER,wikidataurl TEXT,age FLOAT,ofAge BOOLEAN,lastmodified TIMESTAMP)",entityInfo.createTableCmd)
self.assertEqual("INSERT INTO Person (name,born,numberInLine,wikidataurl,age,ofAge,lastmodified) values (:name,:born,:numberInLine,:wikidataurl,:age,:ofAge,:lastmodified)",entityInfo.insertCmd)
self.sqlDB=SQLDB(debug=self.debug,errorDebug=True)
entityInfo=self.sqlDB.createTable(listOfRecords[:10],entityInfo.name,entityInfo.primaryKey)
tableList=self.sqlDB.getTableList()
if self.debug:
print (tableList)
self.assertEqual(1,len(tableList))
personTable=tableList[0]
self.assertEqual("Person",personTable['name'])
self.assertEqual(7,len(personTable['columns']))
uml=UML()
plantUml=uml.tableListToPlantUml(tableList,packageName="Royals",withSkin=False)
if self.debug:
print(plantUml)
expected="""package Royals {
class Person << Entity >> {
age : FLOAT
born : DATE
lastmodified : TIMESTAMP
name : TEXT <<PK>>
numberInLine : INTEGER
ofAge : BOOLEAN
wikidataurl : TEXT
}
}
"""
self.assertEqual(expected,plantUml)
# testGeneralization
listOfRecords=[{'name': 'Royal family', 'country': 'UK', 'lastmodified':datetime.now()}]
entityInfo=self.sqlDB.createTable(listOfRecords[:10],'Family','name')
tableList=self.sqlDB.getTableList()
self.assertEqual(2,len(tableList))
uml=UML()
plantUml=uml.tableListToPlantUml(tableList,generalizeTo="PersonBase",withSkin=False)
print(plantUml)
expected='''class PersonBase << Entity >> {
lastmodified : TIMESTAMP
name : TEXT <<PK>>
}
class Person << Entity >> {
age : FLOAT
born : DATE
numberInLine : INTEGER
ofAge : BOOLEAN
wikidataurl : TEXT
}
class Family << Entity >> {
country : TEXT
}
PersonBase <|-- Person
PersonBase <|-- Family
'''
self.assertEqual(expected,plantUml)
def testUniqueConstraint(self):
'''
test for https://github.com/WolfgangFahl/pyLoDStorage/issues/4
sqlite3.IntegrityError: UNIQUE constraint failed: ... show debug info
'''
listOfDicts=[
{"name": "John Doe"},
{"name": "Frank Doe"},
{"name": "John Doe"},
{"name":"Tim Doe"}]
sqlDB=SQLDB(debug=self.debug,errorDebug=True)
entityInfo=sqlDB.createTable(listOfDicts[:10],'Does','name')
try:
sqlDB.store(listOfDicts,entityInfo,executeMany=False)
self.fail("There should be an exception")
except Exception as ex:
expected="""INSERT INTO Does (name) values (:name)
failed:UNIQUE constraint failed: Does.name
record #3={'name': 'John Doe'}"""
errMsg=str(ex)
self.assertEqual(expected,errMsg)
def testSqlite3(self):
'''
test sqlite3 with a few records from the royal family
'''
listOfRecords=Sample.getRoyals()
resultList=self.checkListOfRecords(listOfRecords, 'Person', 'name',debug=True)
if self.debug:
print(resultList)
self.assertEqual(listOfRecords,resultList)
def testBindingError(self):
'''
test list of Records with incomplete record leading to
"You did not supply a value for binding 2"
see https://bugs.python.org/issue41638
'''
listOfRecords=[{'name':'Pikachu', 'type':'Electric'},{'name':'Raichu' }]
for executeMany in [True,False]:
try:
self.checkListOfRecords(listOfRecords,'Pokemon','name',executeMany=executeMany)
self.fail("There should be an exception")
except Exception as ex:
if self.debug:
print(str(ex))
self.assertTrue('no value supplied for column' in str(ex))
def testListOfCities(self):
'''
test sqlite3 with some 120000 city records
'''
listOfRecords=Sample.getCities()
for fixDates in [True,False]:
retrievedList=self.checkListOfRecords(listOfRecords,'City',fixDates=fixDates)
self.assertEqual(len(listOfRecords),len(retrievedList))
def testQueryParams(self):
'''
test Query Params
'''
listOfDicts=[
{"city": "New York", "country": "US"},
{"city": "Amsterdam", "country": "NL"},
{"city": "Paris", "country": "FR"}]
sqlDB=SQLDB(debug=self.debug,errorDebug=True)
entityInfo=sqlDB.createTable(listOfDicts[:10],'cities','city')
sqlDB.store(listOfDicts,entityInfo,executeMany=False)
query="SELECT * from cities WHERE country in (?)"
params=('FR',)
frCities=sqlDB.query(query,params)
if self.debug:
print (frCities);
self.assertEqual([{'city': 'Paris', 'country': 'FR'}],frCities)
def testSqllite3Speed(self):
'''
test sqlite3 speed with some 100000 artificial sample records
consisting of two columns with a running index
'''
limit=100000
listOfRecords=Sample.getSample(limit)
self.checkListOfRecords(listOfRecords, 'Sample', 'pKey')
def testBackup(self):
'''
test creating a backup of the SQL database
'''
if sys.version_info >= (3, 7):
listOfRecords=Sample.getCities()
self.checkListOfRecords(listOfRecords,'City',fixDates=True,doClose=False)
backupDB="/tmp/testSqlite.db"
self.sqlDB.backup(backupDB,profile=True,showProgress=200)
size=os.stat(backupDB).st_size
print ("size of backup DB is %d" % size)
self.assertTrue(size>600000)
self.sqlDB.close()
# restore
ramDB=SQLDB.restore(backupDB, SQLDB.RAM, profile=True)
entityInfo=EntityInfo(listOfRecords[:50],'City',debug=True)
allCities=ramDB.queryAll(entityInfo)
self.assertEqual(len(allCities),len(listOfRecords))
def testCopy(self):
'''
test copying databases into another database
'''
dbFile="/tmp/DAWT_Sample3x1000.db"
copyDB=SQLDB(dbFile)
for sampleNo in range(3):
listOfRecords=Sample.getSample(1000)
self.checkListOfRecords(listOfRecords, 'Sample_%d_1000' %sampleNo, 'pKey',doClose=False)
self.sqlDB.copyTo(copyDB)
size=os.stat(dbFile).st_size
print ("size of copy DB is %d" % size)
self.assertTrue(size>70000)
tableList=copyDB.getTableList()
print(tableList)
for sampleNo in range(3):
self.assertEqual('Sample_%d_1000' %sampleNo,tableList[sampleNo]['name'])
if __name__ == "__main__":
#import sys;sys.argv = ['', 'Test.testSqllit3']
unittest.main()
SPARQL unit test
see https://github.com/WolfgangFahl/DgraphAndWeaviateTest/blob/master/tests/testJena.py
'''
Created on 2020-08-14
@author: wf
'''
import unittest
import getpass
from lodstorage.sparql import SPARQL
from lodstorage.sample import Sample
import time
class TestSPARQL(unittest.TestCase):
''' Test SPARQL access e.g. Apache Jena via Wrapper'''
def setUp(self):
self.debug=False
pass
def tearDown(self):
pass
def getJena(self,mode='query',debug=False,typedLiterals=False,profile=False):
'''
get the jena endpoint for the given mode
Args:
mode(string): query or update
debug(boolean): True if debug information should be output
typedLiterals(boolean): True if INSERT DATA SPARQL commands should use typed literals
profile(boolean): True if profile/timing information should be shown
'''
endpoint="http://localhost:3030/example"
jena=SPARQL(endpoint,mode=mode,debug=debug,typedLiterals=typedLiterals,profile=profile)
return jena
def testJenaQuery(self):
'''
test Apache Jena Fuseki SPARQL endpoint with example SELECT query
'''
jena=self.getJena()
queryString = "SELECT * WHERE { ?s ?p ?o. }"
results=jena.query(queryString)
self.assertTrue(len(results)>20)
pass
def testJenaInsert(self):
'''
test a Jena INSERT DATA
'''
jena=self.getJena(mode="update")
insertCommands = [ """
PREFIX cr: <http://cr.bitplan.com/>
INSERT DATA {
cr:version cr:author "Wolfgang Fahl".
}
""",'INVALID COMMAND']
for index,insertCommand in enumerate(insertCommands):
result,ex=jena.insert(insertCommand)
if index==0:
self.assertTrue(ex is None)
print(result)
else:
msg=ex.args[0]
self.assertTrue("QueryBadFormed" in msg)
self.assertTrue("Error 400" in msg)
pass
def checkErrors(self,errors,expected=0):
'''
check the given list of errors - print any errors if there are some
and after that assert that the length of the list of errors is zero
Args:
errors(list): the list of errors to check
'''
if len(errors)>0:
print("ERRORS:")
for error in errors:
print(error)
self.assertEquals(expected,len(errors))
def testDob(self):
'''
test the DOB (date of birth) function that converts from ISO-Date to
datetime.date
'''
dt=Sample.dob("1926-04-21")
self.assertEqual(1926,dt.year)
self.assertEqual(4,dt.month)
self.assertEqual(21,dt.day)
def testListOfDictInsert(self):
'''
test inserting a list of Dicts and retrieving the values again
using a person based example
instead of
https://en.wikipedia.org/wiki/FOAF_(ontology)
we use an object oriented derivate of FOAF with a focus on datatypes
'''
listofDicts=Sample.getRoyals()
typedLiteralModes=[True,False]
entityType='foafo:Person'
primaryKey='name'
prefixes='PREFIX foafo: <http://foafo.bitplan.com/foafo/0.1/>'
for typedLiteralMode in typedLiteralModes:
jena=self.getJena(mode='update',typedLiterals=typedLiteralMode,debug=True)
deleteString= """
PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
PREFIX foafo: <http://foafo.bitplan.com/foafo/0.1/>
DELETE WHERE {
?person a 'foafo:Person'.
?person ?p ?o.
}
"""
jena.query(deleteString)
errors=jena.insertListOfDicts(listofDicts,entityType,primaryKey,prefixes)
self.checkErrors(errors)
jena=self.getJena(mode="query",debug=True)
queryString = """
PREFIX foafo: <http://foafo.bitplan.com/foafo/0.1/>
SELECT ?name ?born ?numberInLine ?wikidataurl ?age ?ofAge ?lastmodified WHERE {
?person a 'foafo:Person'.
?person foafo:Person_name ?name.
?person foafo:Person_born ?born.
?person foafo:Person_numberInLine ?numberInLine.
?person foafo:Person_wikidataurl ?wikidataurl.
?person foafo:Person_age ?age.
?person foafo:Person_ofAge ?ofAge.
?person foafo:Person_lastmodified ?lastmodified.
}"""
personResults=jena.query(queryString)
self.assertEqual(len(listofDicts),len(personResults))
personList=jena.asListOfDicts(personResults)
for index,person in enumerate(personList):
print("%d: %s" %(index,person))
# check the correct round-trip behavior
self.assertEqual(listofDicts,personList)
def testControlEscape(self):
'''
check the control-escaped version of an UTF-8 string
'''
controls="Α\tΩ\r\n";
expected="Α\\tΩ\\r\\n"
esc=SPARQL.controlEscape(controls)
self.assertEqual(expected,esc)
def testSPARQLErrorMessage(self):
'''
test error handling
see https://stackoverflow.com/questions/63486767/how-can-i-get-the-fuseki-api-via-sparqlwrapper-to-properly-report-a-detailed-err
'''
listOfDicts=[{
'title': '“Bioinformatics of Genome Regulation and Structure\Systems Biology” – BGRS\SB-2018',
'url': 'https://thenode.biologists.com/event/11th-international-multiconference-bioinformatics-genome-regulation-structuresystems-biology-bgrssb-2018/'}]
entityType="cr:Event"
primaryKey='title'
prefixes="PREFIX cr: <http://cr.bitplan.com/Event/0.1/>"
jena=self.getJena(mode='update',typedLiterals=False,debug=True)
errors=jena.insertListOfDicts(listOfDicts,entityType,primaryKey,prefixes)
self.checkErrors(errors,1)
error=errors[0]
self.assertTrue("probably the sparql query is bad formed" in error)
def testEscapeStringContent(self):
'''
test handling of double quoted strings
'''
helpListOfDicts=[{'topic':'edit','description': '''Use
the "edit"
button to start editing - you can use
- tab \t
- carriage return \r
- newline \n
as escape characters
'''
}]
entityType='help:Topic'
primaryKey='topic'
prefixes='PREFIX help: <http://help.bitplan.com/help/0.0.1/>'
jena=self.getJena(mode='update',debug=True)
errors=jena.insertListOfDicts(helpListOfDicts, entityType, primaryKey, prefixes, profile=True)
self.checkErrors(errors)
query="""
PREFIX help: <http://help.bitplan.com/help/0.0.1/>
SELECT ?topic ?description
WHERE {
?help help:Topic_topic ?topic.
?help help:Topic_description ?description.
}
"""
jena=self.getJena(mode='query')
listOfDicts=jena.queryAsListOfDicts(query)
# check round trip equality
self.assertEqual(helpListOfDicts,listOfDicts)
def testIssue7(self):
'''
test conversion of dates with timezone info
'''
value="2020-01-01T00:00:00Z"
dt=SPARQL.strToDatetime(value)
self.assertEqual(dt.year,2020)
def testListOfDictSpeed(self):
'''
test the speed of adding data
'''
limit=5000
for batchSize in [None,1000]:
listOfDicts=Sample.getSample(limit)
jena=self.getJena(mode='update',profile=True)
entityType="ex:TestRecord"
primaryKey='pkey'
prefixes='PREFIX ex: <http://example.com/>'
startTime=time.time()
errors=jena.insertListOfDicts(listOfDicts, entityType, primaryKey, prefixes,batchSize=batchSize)
self.checkErrors(errors)
elapsed=time.time()-startTime
print ("adding %d records took %5.3f s => %5.f records/s" % (limit,elapsed,limit/elapsed))
def testLocalWikdata(self):
'''
check local wikidata
'''
# check we have local wikidata copy:
if getpass.getuser()=="wf":
# use 2018 wikidata copy
endpoint="http://jena.zeus.bitplan.com/wikidata/"
wd=SPARQL(endpoint)
queryString="""# get a list of whisky distilleries
PREFIX wd: <http://www.wikidata.org/entity/>
PREFIX wdt: <http://www.wikidata.org/prop/direct/>
SELECT ?item ?coord
WHERE
{
# instance of whisky distillery
?item wdt:P31 wd:Q10373548.
# get the coordinate
?item wdt:P625 ?coord.
}
"""
results=wd.query(queryString)
self.assertTrue(238<=len(results))
if __name__ == "__main__":
#import sys;sys.argv = ['', 'Test.testName']
unittest.main()