import json
import os
import string
import random
import hashlib
import shutil
from typing import List
import sqlite3
from libreary.adapters.BaseAdapter import BaseAdapter
from libreary.adapters.local import LocalAdapter
from libreary.adapters.s3 import S3Adapter
from libreary.adapters.drive import GoogleDriveAdapter
from libreary.exceptions import ResourceNotIngestedException, ChecksumMismatchException, NoCopyExistsException
from libreary.exceptions import RestorationFailedException, AdapterCreationFailedException, AdapterRestored
[docs]class AdapterManager:
"""
The AdapterManager is responsible for all interaction with adapters, except for intial ingestion.
It is able to keep track of all of the adapters we have, do integrity checks on them,
perform initial distribution, compare versions from different adapters, and make insert and delete calls.
The Adapter Manager is responsible for most of the operation of ingestion, deletion, and management of
digital objects within LIBREary. Most customization will occur by subclassing the AdapterManager.
This class currently contains the following methods:
- reload_levels_adapters (create adapter objects and set levels based on configuration)
- get_all_levels (get levels based on what exists in metadata db)
- get_all_adapters (create adapter objects based on levels)
- set_additional_adapter (manually create an adapter object and add it to the AdapterManager's list of adapters)
- verify_adapter (make sure an adapter is working properly)
- create_adapter [static method] (factory function for adapter objects)
- send_resource_to_adapters (send copies a resource to all the places they need to be)
- get_adapters_by_level (get all adapters from a level)
- delete_resource_from_adapters (delete non-canonical copies of an object)
- change_resource_level (change the level of an object)
- get_canonical_copy_metadata
- summarize_copies
- retrieve_by_preference (retrieve an object, prefering the canonical adapter)
- check_single_resource_single_adapter (make sure that a copy matches the canonical copy)
- verify_adapter_metadata (verify that a file can be retrieved)
- get_resource_metadata
- restore_canonical_copy (restore a faulty canonical copy)
- compare_copies (check if two copies of a resource are the same)
- verify_copy (check if a copy matches the canonicl copy)
"""
def __init__(self, config: dict):
"""
Constructor for the AdapterManager object. This object can be created manually, but
in most cases, it will be constructed by the LIBRE-ary main object. It expects a python dict
:param config, which should be structured as follows:
```{json}
{
"metadata": {
"db_file": "path to SQLite3 DB file for metadata"
},
"adapters": # List of adapters - each entry should look like:
[{
"type":"AdapterType (name of class)",
"id": "Adapter Identifier"
}],
"options": {
"dropbox_dir": "Path to dropbox directory, where files you want to ingest should be placed",
"output_dir": "Path to directory you want files to be retrieved to",
"config_dir": "Path to config directory"
},
"canonical_adapter":"Adapter Identifier for Canonical Adapter"
}
```
"""
self.config = config
self.all_adapters = self.config["adapters"]
self.metadata_db = os.path.realpath(config['metadata'].get("db_file"))
self.conn = sqlite3.connect(self.metadata_db)
self.cursor = self.conn.cursor()
self.dropbox_dir = config["options"]["dropbox_dir"]
self.ret_dir = config["options"]["output_dir"]
self.config_dir = config["options"]["config_dir"]
self.levels = {}
self.adapters = {}
self.canonical_adapter = self.config["canonical_adapter"]
# Run this any time you expect levels and adapters to change
# For most use cases, this will only be on construction
# This method should be run externally, any time a new level is added
self.reload_levels_adapters()
[docs] def reload_levels_adapters(self) -> None:
"""
Set the `self.adapters` and `self.levels` instance variables.
This object needs to be stateful in this way because each adapter might
either require time-sensitive authentication information (tokens, etc), or
may be computationally expensive to create. For this reason, we want the
`AdapterManager` to have instance variables with adapter objects.
"""
self._set_levels()
self._set_adapters()
[docs] def get_all_levels(self) -> List[dict]:
"""
Returns all levels in the metadata database.
Returns a list of dictionaries each with the following format:
```
{"id": (int) level ID,
"name": (str) level name,
"frequency": (int) scheduled check frequency,
"adapters": (dict) dictionary of adapters associated with this level}
```
"""
level_data = self.cursor.execute("select * from levels").fetchall()
levels = {}
for level in level_data:
levels[level[1]] = {"id": level[0],
"name": level[1],
"frequency": level[2],
"adapters": json.loads(level[3])}
return levels
def _set_levels(self) -> None:
"""
Convenience method to reset levels as an instance variable
"""
self.levels = self.get_all_levels()
[docs] def get_all_adapters(self) -> List[dict]:
"""
Set up all of the adapters we will need, based on all levels
from the metadata database.
Parses `self.levels` to create adapter objects.
The structure of the return value is is a dictionary structured as follows:
```
{
"adapter_id1": <AdapterObject>,
"adapter_id2": <AdapterObject>,...
}
```
Ensure that `self.levels` is set properly before running this, by
calling `self._set_levels()`
"""
adapters = {}
for level in self.levels.values():
# Each level may need several adapters
for adapter in level["adapters"]:
adapters[adapter["id"]] = self.create_adapter(
adapter["type"], adapter["id"], self.config_dir)
return adapters
def _set_adapters(self) -> None:
"""
Convenience method to set `self.adapters` to the output of `get_all_adapters
The structure of `self.adapters` is is a dictionary structured as follows:
```
{
"adapter_id1": <AdapterObject>,
"adapter_id2": <AdapterObject>,...
}
```
Ensure that `self.levels` is set properly before running this, by
calling `self._set_levels()`
"""
self.adapters = self.get_all_adapters()
[docs] def set_additional_adapter(self, adapter_id: str,
adapter_type: str) -> BaseAdapter:
"""
Manually add an adapter to the pool of adapters.
:param adapter_id - the adapter ID of the adapter you're creating
There should be a matching config file for this adapter ID
:param adapter_type - the type of the adapter you wish to create.
Must be the actual class name, i.e. "LocalAdapter".
"""
adapter = self.create_adapter(
adapter_type, adapter_id, self.config_dir)
self.adapters[adapter_id] = adapter
return adapter
[docs] def verify_adapter(self, adapter_id: str) -> bool:
"""
Make sure an adapter is working. To do this, we store, retrieve,
and delete a file that we know the contents of,
and make sure the checksums are as they should be.
:param adapter_id - The adapter ID you'd like to verify.
"""
dropbox_path = "{}/libreary_test_file.txt".format(
self.config["options"]["dropbox_dir"])
adapter = self.adapters[adapter_id]
data_to_store = ''.join(random.choice(string.ascii_letters)
for i in range(500))
with open(dropbox_path, "w") as fh:
fh.write(data_to_store)
real_checksum = hashlib.sha1(
open(dropbox_path, "rb").read()).hexdigest()
r_id = "LIBREARY_TEST_RESORURCE"
# To circumvent full ingestion process, we manually use _ingest_canonical
# Not recommended for end users to do this.
locator = adapter._store_canonical(
dropbox_path,
r_id,
real_checksum,
"libreary_test_resource.txt")
# Do minimal ingestion so that adapter knows what it needs to retrieve
self.cursor.execute("insert into resources values (?, ?, ?, ?, ?, ?, ?)",
(None, locator, "low,", "libreary_test_file.txt", real_checksum, r_id, "A resource for testing LIBREary adapters with"))
self.conn.commit()
new_path = adapter.retrieve(r_id)
new_checksum = hashlib.sha1(open(new_path, "rb").read()).hexdigest()
r_val = False
if new_checksum == real_checksum:
r_val = True
adapter._delete_canonical(r_id)
try:
os.remove(new_path)
except Exception:
pass
os.remove(dropbox_path)
# Delete from res table
self.cursor.execute("delete from resources where uuid=?",
(r_id,))
self.conn.commit()
return r_val
[docs] @staticmethod
def create_adapter(adapter_type: str, adapter_id: str,
config_dir: str) -> BaseAdapter:
"""
Static method for creating and returning an adapter object.
This is essentially an Adapter factory.
:param adapter_type - must be the name of a valid adapter class.
:param adapter_id - the identifier you want to label this adapter with
:param config_dir - configuration directory. Must contain a file called
`{adapter_id}_config.json`
"""
cfg = AdapterManager.create_config_for_adapter(
adapter_id, adapter_type, config_dir)
adapter = eval("{}({})".format(adapter_type, cfg))
return adapter
[docs] @staticmethod
def create_config_for_adapter(
adapter_id: str, adapter_type: str, config_dir: str) -> dict:
"""
Static method for creating an adapter configuration. This is necessary for
the adapter factory.
:param adapter_type - must be the name of a valid adapter class.
:param adapter_id - the identifier you want to label this adapter with
:param config_dir - configuration directory. Must contain a file called
`{adapter_id}_config.json`
"""
base_config = json.load(
open("{}/{}_config.json".format(config_dir, adapter_id)))
general_config = json.load(
open("{}/config.json".format(config_dir)))
full_adapter_conf = {}
full_adapter_conf["adapter"] = base_config["adapter"]
full_adapter_conf["adapter"]["adapter_type"] = adapter_type
full_adapter_conf["metadata"] = general_config["metadata"]
full_adapter_conf["options"] = general_config["options"]
return full_adapter_conf
[docs] def send_resource_to_adapters(
self, r_id: str, delete_after_send: bool = False) -> None:
"""
Sends a resource to all the places it should go. The resource must
have already been ingested through the Ingester. This method:
1. Figures out what levels a resource has been assigned
2. Figures out what adapters are associated with that level
3. Figured out any overlap, to avoid storing things twice in one adapter
4. Stores copies to each adapter
5. optionally, deletes any remaining files in the dropbox directory
:param r_id - resource UUID you wish to distribute
:param delete_after_send - boolean indicating whether to delete
files after storage
"""
try:
resource_metadata = self.get_resource_metadata(r_id)[0]
except IndexError:
raise ResourceNotIngestedException
# Make sure that resource is in dropbox:
filename = resource_metadata[3]
expected_location = "{}/{}".format(self.dropbox_dir, filename)
file_there = False
if os.path.isfile(expected_location):
file_hash = hashlib.sha1(
open(
expected_location,
"rb").read()).hexdigest()
expected_hash = resource_metadata[4]
if file_hash == expected_hash:
# there's a file in that location, and its checksum matches
file_there = True
# If the file isn't where we want it, put it there
if not file_there:
# retrieve moves it to the retrieval dir
current_path = self.adapters[self.canonical_adapter].retrieve(r_id)
# so we move it to the dropbox_dir
shutil.move(current_path, expected_location)
levels = resource_metadata[2].split(",")
for level in levels:
adapters = self.get_adapters_by_level(level)
for adapter in adapters:
adapter.store(r_id)
if delete_after_send:
os.remove(expected_location)
[docs] def get_adapters_by_level(self, level: str) -> List[BaseAdapter]:
"""
Get a list of adapter objects based on a level.
Returns a list of callable adapter objects.
:param level - the name of the level you want the adapters for
"""
adapter_names = self.levels[level]["adapters"]
adapters = []
for adapter in adapter_names:
adapters.append(self.adapters[adapter["id"]])
return adapters
[docs] def delete_resource_from_adapters(self, r_id: str) -> None:
"""Deletes a resource from all adapters it's stored in.
Does not delete canonical copy
:param r_id - UUID of resource to delete copies of
"""
try:
resource_metadata = self.get_resource_metadata(r_id)[0]
except IndexError:
raise ResourceNotIngestedException
levels = resource_metadata[2].split(",")
for level in levels:
adapters = self.get_adapters_by_level(level)
for adapter in adapters:
adapter.delete(r_id)
[docs] def change_resource_level(self, r_id: str, new_levels: List[str]) -> None:
"""
Assign a new set of levels to a resource.
Removes all levels from a resource, replaces them with :param new_levels
:param r_id - UUID of resource you'd like to change the levels of
:param new_levels: list of names of levels to assign to the resource
"""
# Because d_r_f_a doesn't delete canonical copy, we can simply use
# it to reset
self.delete_resource_from_adapters(r_id)
sql = "update resources set levels = '?' where uuid=?"
self.cursor.execute(sql, (r_id, ",".join([l for l in new_levels])))
self.conn.commit()
# now that we've updated the levels, we refresh levels
self.reload_levels_adapters()
# now, we can just act as if it has never been sent off:
self.send_resource_to_adapters(r_id)
[docs] def summarize_copies(self, r_id: str) -> List[List[str]]:
"""
Get a summary of all copies of a single resource. That summary includes:
`copy_id`, `resource_id`, `adapter_identifier`, `locator`, `checksum`, `adapter type`, `canonical (bool)`
for each copy
This method trusts the metadata database. There should be a separate method to
verify the metadata db so that we know we can trust this info
:param r_id - UUID of resource you'd like to learn about
"""
sql = "select * from copies where resource_id = '{}'".format(r_id)
return self.cursor.execute(sql).fetchall()
[docs] def retrieve_by_preference(self, r_id: str) -> str:
"""
Retrieve a resource.
Get a copy of a file, preferring canonical adapter, then enforcing some preference hierarchy
This will be called when Libreary is asked to retrieve.
This places a file in the configured `output_dir` and returns a path to the retrieved file.
Keep in mind that the output directory may be volatile and should not be used for storage.
:param r_id - UUID of resource you'd like to retrieve
"""
# First, try the canonical copy:
try:
new_loc = self.adapters[self.canonical_adapter].retrieve(r_id)
return new_loc
except ChecksumMismatchException:
print("Canonical Recovery Failed. Attempting to Restore Canonical Copy")
self.restore_canonical_copy(r_id)
for adapter in self.adapters.values():
try:
new_loc = adapter.retrieve(r_id)
return new_loc
except ChecksumMismatchException:
print("Canonical Recovery Failed. Attempting to Restore Canonical Copy")
self.restore_from_canonical_copy(adapter.adapter_id, r_id)
[docs] def check_single_resource_single_adapter(
self, r_id: str, adapter_id: str) -> bool:
"""
Ensure that a copy of an object matches its canonical checksum.
This method trusts that the metadata db has the proper canonical checksum.
If a copy is found to be faulty, a restore is attempted.
If a copy is fount to not exist, it is created.
:param r_id - UUID of resource you'd like to check
:param r_id - adapter_id for copy of resource you are checking
"""
resource_info = self.get_resource_metadata(r_id)
canonical_checksum = resource_info[4]
copies = self.get_all_copies_metadata(r_id)
for copy in copies:
if adapter_id == copy[2]:
found = True
if copy[2] != canonical_checksum:
try:
self.restore_from_canonical_copy(
self, adapter_id, r_id)
except RestorationFailedException:
print(
"Restoration of {} in {} failed".format(
r_id, adapter_id))
found = False
# didn't find the copy from this adapter
if not found:
try:
a = AdapterManager.create_adapter(
self.adapter_type, adapter_id)
a.store(r_id)
found = True
except AdapterCreationFailedException:
found = False
return found
[docs] def restore_canonical_copy(self, r_id: str) -> None:
"""
Attempt to Restore a detected fault in the canonical copy of an object.
Delete the canonical copy of an object, but keep non-canonical copies.
After that, create a new canonical copy, preserving resource UUID,
but with the correct object contents.
:param r_id - UUID of resource you'd like to restore
"""
try:
resource_info = self.get_resource_metadata(r_id)
real_checksum = resource_info[4]
levels = resource_info[2].split(",")
filename = resource_info[3]
except IndexError:
raise ResourceNotIngestedException
self.adapters[self.canonical_adapter]._delete_canonical(r_id)
current_location = 0
try:
for level in levels:
adapters = self.get_adapters_by_level(level)
for adapter in adapters:
try:
current_location = adapter.retrieve(r_id)
raise AdapterRestored
except ResourceNotIngestedException:
continue
except ChecksumMismatchException:
continue
except NoCopyExistsException:
continue
except AdapterRestored:
self.adapters[self.canonical_adapter].store_canonical(
current_location, r_id, real_checksum, filename)
[docs] def restore_from_canonical_copy(self, adapter_id: str, r_id: str) -> None:
"""
Restore a copy of an object from its canonical copy.
To restore from the canonical copy, we can simply delete and
re-ingest the fraudulent copy.
:param adapter_id - the ID of the adapter with the broken copy
:param r_id - The resource UUID of the resource we've detected an issue with
"""
self.adapters[adapter_id].delete(r_id)
self.adapters[adapter_id].store(r_id)
[docs] def compare_copies(self, r_id: str, adapter_id_1: str,
adapter_id_2: str, deep: bool = False) -> bool:
"""
Compare copies of a resource in two adapters. Returns True iff
the checksums of each copy match.
A deep compare will actually compute the current checksum of the
file stored in the adapter specified. Some adapters can do this
with no file I/O, while others will have to actually retrieve the file
to perform this operation
:param r_id - the UUID of the resource to compare
:param adapter_id_1 - Adapter ID of the first adapter
:param adapter_id_2 - Adapter ID of the second adapter
:param deep - specify whether to run a deep or shallow check
"""
try:
copy_info_1 = self.cursor.execute(
"select * from copies where resource_id=? and adapter_identifier=? limit 1",
(r_id, adapter_id_1)).fetchall()[0]
copy_info_2 = self.cursor.execute(
"select * from copies where resource_id=? and adapter_identifier=? limit 1",
(r_id, adapter_id_2)).fetchall()[0]
except IndexError:
raise NoCopyExistsException
if not deep:
return copy_info_1[4] == copy_info_2[4]
return self.adapters[adapter_id_1].get_actual_checksum(
r_id) == self.adapters[adapter_id_2].get_actual_checksum(r_id)
[docs] def verify_copy(self, r_id: str, adapter_id: str,
deep: bool = False) -> bool:
"""
Compare copies of a resource in two adapters, one being canonical.
Returns True iff the checksums of each copy match.
A deep compare will actually compute the current checksum of the
file stored in the adapter specified. Some adapters can do this
with no file I/O, while others will have to actually retrieve the file
to perform this operation
:param r_id - the UUID of the resource to compare
:param adapter_id - Adapter ID of the adapter to check against
:param deep - specify whether to run a deep or shallow check
"""
return self.compare_copies(
r_id, adapter_id, self.canonical_adapter, deep=deep)