Source code for libreary.adapters.drive

import sqlite3
import os
from shutil import copyfile
import hashlib
import json
import pickle
import io
from pathlib import Path
from typing import List


try:
    from googleapiclient.discovery import build
    from google_auth_oauthlib.flow import InstalledAppFlow
    from google.auth.transport.requests import Request
    from pydrive.auth import GoogleAuth
    from apiclient.http import MediaFileUpload, MediaIoBaseDownload

except ImportError:
    _google_enabled = False
else:
    _google_enabled = True

from libreary.exceptions import ResourceNotIngestedException, ChecksumMismatchException, NoCopyExistsException, OptionalModuleMissingException
from libreary.exceptions import RestorationFailedException, AdapterCreationFailedException, AdapterRestored, StorageFailedException, ConfigurationError

SCOPES = ['https://www.googleapis.com/auth/drive']


[docs]class GoogleDriveAdapter(): """docstring for GoogleDriveAdapter An Adapter allows LIBREary to save copies of digital objects to different places across cyberspace. Working with many adapters in concert, one should be able do save sufficient copies to places they want them. DriveAdapter allows you to store objects in Google Drive """ def __init__(self, config: dict): """ Constructor for GoogleDriveAdapter. Expects a python dict :param `config` in the following format: You must have already created the Google Drive directory you wish to use for this to work. ```{json} { "metadata": { "db_file": "path to metadata db" }, "adapter": { "folder_path": "Name of google drive folder for storage. LIBREary will create this folder", "adapter_identifier": "friendly identifier", "adapter_type": "GoogleDriveAdapter", "credentials_file":"Path to credentials file. See get_google_client docs for more", "token_file":"Path to place you want to save a token file", }, "options": { "dropbox_dir": "path to dropbox directory", "output_dir": "path to output directory" }, "canonical":"(boolean) true if this is the canonical adapter" } """ self.metadata_db = os.path.realpath(config['metadata'].get("db_file")) self.adapter_id = config["adapter"]["adapter_identifier"] self.conn = sqlite3.connect(self.metadata_db) self.cursor = self.conn.cursor() self.token_file = config["adapter"]["token_file"] self.dropbox_dir = config["options"]["dropbox_dir"] self.folder_path = config["adapter"]["folder_path"] self.adapter_type = "LocalAdapter" self.ret_dir = config["options"]["output_dir"] self.credentials_file = config["adapter"]["credentials_file"] if not _google_enabled: raise OptionalModuleMissingException( ['googleapiclient'], "Google Drive adapter requires the googleapiclient module.") self.get_google_client() self.dir_id = self._get_or_create_folder()
[docs] def get_google_client(self) -> None: """ Build a Google Drive client object. Important to note that this uses an OAUTH flow, so you'll need to run it from a computer that has a web browser you can use. Store the creds JSON file in the place you note in `config["adapter"]["credentials_file"]` A token will be stored in `config["adapter"]["token_file"]`. If you are running LIBREary on a headless server, I recommend getting a token first, and saving the token file on the server, so that you don't need to mess around with headless browsers etc. Get creds JSON file from here: https://developers.google.com/drive/api/v3/quickstart/python?authuser=3 """ creds = None # The file token.pickle stores the user's access and refresh tokens, and is # created automatically when the authorization flow completes for the first # time. # We save this in the self.config["token_file"] file if os.path.exists(self.token_file): with open(self.token_file, 'rb') as token: creds = pickle.load(token) # If there are no (valid) credentials available, let the user log in. if not creds or not creds.valid: if creds and creds.expired and creds.refresh_token: creds.refresh(Request()) else: flow = InstalledAppFlow.from_client_secrets_file( self.credentials_file, SCOPES) creds = flow.run_local_server(port=0) # Save the credentials for the next run with open(self.token_file, 'wb') as token: pickle.dump(creds, token) self.service = build('drive', 'v3', credentials=creds)
def _list_objects(self) -> None: """ Sanity-check method for devs to use. Lists top 1000 items in drive """ results = self.service.files().list( pageSize=1000, fields="nextPageToken, files(id, name)").execute() items = results.get('files', []) if not items: print('No files found.') else: for item in items: print(item) def _get_or_create_folder(self) -> str: """ If the folder specified in config exists, get its id. If not, create it. """ page_token = None dir_id = None response = self.service.files().list(q="mimeType='application/vnd.google-apps.folder' and name='{}'".format(self.folder_path), spaces='drive', fields='nextPageToken, files(id, name)', pageToken=page_token).execute() for file in response.get('files', []): dir_id = file.get('id') page_token = response.get('nextPageToken', None) if page_token is None: break if not dir_id: file_metadata = { 'name': self.folder_path, 'mimeType': 'application/vnd.google-apps.folder' } file = self.service.files().create(body=file_metadata, fields='id').execute() dir_id = file.get('id') return dir_id def _upload_file(self, filename: str, current_path: str) -> str: """ Helper method to upload a file to drive, in the directory LIBRE-ary is configured to use. :param filename - name of file to upload :param current_path - place where the file is right now """ file_metadata = {'name': filename, 'parents': [self.dir_id]} media = MediaFileUpload(current_path, mimetype='image/jpeg') file = self.service.files().create(body=file_metadata, media_body=media, fields='id').execute() f_id = file.get('id') return f_id
[docs] def store(self, r_id: str) -> str: """ Store a copy of a resource in this adapter. Store assumes that the file is in the `dropbox_dir`. AdapterManager will always verify that this is the case. :param r_id - the resource to store's UUID """ file_metadata = self.load_metadata(r_id)[0] checksum = file_metadata[4] name = file_metadata[3] current_location = "{}/{}".format(self.dropbox_dir, name) sha1Hash = hashlib.sha1(open(current_location, "rb").read()) sha1Hashed = sha1Hash.hexdigest() new_name = "{}_{}".format(r_id, name) other_copies = self.cursor.execute( "select * from copies where resource_id='{}' and adapter_identifier='{}' and not canonical = 1 limit 1".format( r_id, self.adapter_id)).fetchall() if len(other_copies) != 0: print("Other copies from this adapter exist") return if sha1Hashed == checksum: locator = self._upload_file(new_name, current_location) else: print("Checksum Mismatch") raise Exception self.cursor.execute( "insert into copies values ( ?,?, ?, ?, ?, ?, ?)", [None, r_id, self.adapter_id, locator, sha1Hashed, self.adapter_type, False]) self.conn.commit()
[docs] def retrieve(self, r_id: str) -> str: """ Retrieve a copy of a resource from this adapter. Retrieve assumes that the file can be stored to the `output_dir`. AdapterManager will always verify that this is the case. Returns the path to the resource. May overwrite files in the `output_dir` :param r_id - the resource to retrieve's UUID """ try: filename = self.load_metadata(r_id)[0][3] except IndexError: raise ResourceNotIngestedException try: copy_info = self.cursor.execute( "select * from copies where resource_id=? and adapter_identifier=? limit 1", (r_id, self.adapter_id)).fetchall()[0] except IndexError: raise NoCopyExistsException expected_hash = copy_info[4] copy_locator = copy_info[3] real_hash = copy_info[4] new_location = "{}/{}".format(self.ret_dir, filename) if real_hash == expected_hash: self._download_file(copy_locator, new_location) else: raise ChecksumMismatchException return new_location
def _download_file(self, locator: str, new_loc: str) -> None: """ Helper method to download a file from drive, in the directory LIBRE-ary is configured to use. :param locator - google drive ID :param new_loc - place the file should go """ request = self.service.files().get_media(fileId=locator) Path(new_loc).touch() fh = open(new_loc, "wb") downloader = MediaIoBaseDownload(fh, request) done = False while done is False: status, done = downloader.next_chunk()
[docs] def update(self, r_id: str, updated: str) -> None: """ Update a resource with a new object. Preserves UUID and all other metadata (levels, etc.) :param r_id - the UUID of the object you'd like to update :param updated_path - path to the contents of the updated object. """ pass
def _store_canonical(self, current_path: str, r_id: str, checksum: str, filename: str) -> str: """ If we're using the GoogleDrive as a canonical adapter, we need to be able to store from a current path, taking in a generated UUID, rather than looking info up from the database. :param current_path - current path to object :param r_id - UUID of resource you're storing :param checksum - checksum of resource :param filename - filename of resource you're storing """ new_name = "{}_{}".format(r_id, filename) sha1Hash = hashlib.sha1(open(current_path, "rb").read()) sha1Hashed = sha1Hash.hexdigest() sql = "select * from copies where resource_id='{}' and adapter_identifier='{}' and canonical = 1 limit 1".format( str(r_id), self.adapter_id) other_copies = self.cursor.execute(sql).fetchall() if len(other_copies) != 0: print("Other canonical copies from this adapter exist") raise StorageFailedException if sha1Hashed == checksum: locator = self._upload_file(new_name, current_path) else: raise ChecksumMismatchException self.cursor.execute( "insert into copies values ( ?,?, ?, ?, ?, ?, ?)", [None, r_id, self.adapter_id, locator, sha1Hashed, self.adapter_type, True]) self.conn.commit() return locator
[docs] def delete(self, r_id: str) -> None: """ Delete a copy of a resource from this adapter. Delete the corresponding entry in the `copies` table. :param r_id - the resource to retrieve's UUID """ copy_info = self.cursor.execute( "select * from copies where resource_id=? and adapter_identifier=? and not canonical = 1 limit 1", (r_id, self.adapter_id)).fetchall() if len(copy_info) == 0: # We've already deleted, probably as part of another level return copy_info = copy_info[0] copy_locator = copy_info[3] self.service.files().delete(fileId=copy_locator).execute() self.cursor.execute("delete from copies where copy_id=?", [copy_info[0]]) self.conn.commit()
def _delete_canonical(self, r_id: str) -> None: """ Delete a canonical copy of a resource from this adapter. Delete the corresponding entry in the `copies` table. :param r_id - the resource to retrieve's UUID """ copy_info = self.cursor.execute( "select * from copies where resource_id=? and adapter_identifier=? and canonical = 1 limit 1", (r_id, self.adapter_id)).fetchall()[0] copy_locator = copy_info[3] self.service.files().delete(fileId=copy_locator).execute() self.cursor.execute("delete from copies where copy_id=?", [copy_info[0]]) self.conn.commit()
[docs] def load_metadata(self, r_id: str) -> List[List[str]]: """ Get a summary of information about a resource. That summary includes: `id`, `path`, `levels`, `file name`, `checksum`, `object uuid`, `description` This method trusts the metadata database. There should be a separate method to verify the metadata db so that we know we can trust this info :param r_id - UUID of resource you'd like to learn about """ return self.cursor.execute( "select * from resources where uuid='{}'".format(r_id)).fetchall()
[docs] def get_actual_checksum(self, r_id: str, delete_after_download: bool = True) -> str: """ Return an exact checksum of a resource, not relying on the metadata db. The :param deep trusts the tag we've given google drive on ingestion, if True, it will retrieve and recompute """ new_path = self.retrieve(r_id) sha1Hash = hashlib.sha1(open(new_path, "rb").read()) sha1Hashed = sha1Hash.hexdigest() if delete_after_download: os.remove(new_path) return sha1Hashed