Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add elasticsearch snapshotting commands #11769

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 81 additions & 1 deletion arches/app/search/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,14 @@
import logging
import warnings
from datetime import datetime
from elasticsearch import Elasticsearch, helpers, ElasticsearchWarning
from elasticsearch import Elasticsearch, helpers, ElasticsearchWarning, NotFoundError
from elasticsearch.exceptions import RequestError
from elasticsearch.helpers import BulkIndexError
from arches.app.models.system_settings import settings
from arches.app.utils.betterJSONSerializer import JSONSerializer, JSONDeserializer

from arches.app.search.mappings import CONCEPTS_INDEX, RESOURCES_INDEX, TERMS_INDEX


class SearchEngine(object):
def __init__(self, **kwargs):
Expand All @@ -44,6 +46,84 @@ def __init__(self, **kwargs):
if "cloud_id" in settings.ELASTICSEARCH_CONNECTION_OPTIONS:
serializer.utf_encode = True

def create_snapshot(self, repository, snapshot=None, **kwargs):
if snapshot is None:
from datetime import datetime

# input datetime
dt = datetime.now()
# epoch time
epoch_time = datetime(1970, 1, 1)

# subtract Datetime from epoch datetime
delta = dt - epoch_time
snapshot = "{}_{}".format(settings.APP_NAME, int(delta.total_seconds()))

indices = self.get_index_names()

return self.es.snapshot.create(
repository=repository,
snapshot=snapshot,
feature_states=None,
include_global_state=False,
indices=indices,
**kwargs,
)

def get_index_names(self):
custom_indexes = [
index["name"] for index in settings.ELASTICSEARCH_CUSTOM_INDEXES
]

raw_indices = [
RESOURCES_INDEX,
CONCEPTS_INDEX,
TERMS_INDEX,
] + custom_indexes

indices = [self._add_prefix(index) for index in raw_indices]
return indices

def check_snapshot(self, repository, snapshot, **kwargs):
try:
return (
self.es.snapshot.status(
repository=repository, snapshot=snapshot, **kwargs
)["snapshots"][0]["state"]
== "SUCCESS"
)
except KeyError:
return False
except NotFoundError:
return False

def restore_snapshot(self, repository, snapshot, **kwargs):
return self.es.snapshot.restore(
repository=repository, snapshot=snapshot, **kwargs
)

def restore_status(self):
index_statuses = self.es.indices.recovery(index=self.get_index_names())
for key in index_statuses.keys():
for shard in index_statuses[key]["shards"]:
if shard["stage"] != "DONE":
return "in progress"
return "done"

def list_snapshots(self, repository, **kwargs):
try:
return sorted(
[
item["snapshot"]
for item in self.es.snapshot.get(
repository=repository, snapshot="_all", **kwargs
)["snapshots"]
],
reverse=True,
)
except KeyError:
return []

def _add_prefix(self, *args, **kwargs):
if args:
index = args[0].strip()
Expand Down
70 changes: 69 additions & 1 deletion arches/management/commands/es.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
)
import arches.app.utils.index_database as index_database_util

from arches.app.search.search_engine_factory import SearchEngineFactory

logger = logging.getLogger(__name__)


Expand All @@ -59,6 +61,10 @@ def add_arguments(self, parser):
"index_resources_by_transaction",
"add_index",
"delete_index",
"restore_snapshot",
"create_snapshot",
"list_snapshots",
"snapshot_restore_status",
],
help="Operation Type; "
+ "'setup_indexes'=Creates the indexes in Elastic Search needed by the system"
Expand All @@ -70,7 +76,11 @@ def add_arguments(self, parser):
+ "'index_resources_by_type'=Indexes only resources of a given resource_model/graph"
+ "'index_resources_by_transaction'=Indexes only resources of a given transaction"
+ "'add_index'=Register a new index in Elasticsearch"
+ "'delete_index'=Deletes a named index from Elasticsearch",
+ "'delete_index'=Deletes a named index from Elasticsearch"
+ "'restore_snapshot'=Restores a named snapshot from Elasticsearch"
+ "'create_snapshot'=Creates a new snapshot in specified Elasticsearch repository"
+ "'list_snapshots'=Lists snapshots in specified Elasticsearch repository"
+ "'snapshot_restore_status'=Lists snapshots restore status for the current indices",
)

parser.add_argument(
Expand Down Expand Up @@ -146,6 +156,24 @@ def add_arguments(self, parser):
help="Name of the custom index",
)

parser.add_argument(
"-sn",
"--snapshot_name",
action="store",
dest="snapshot_name",
default=None,
help="Name of the snapshot",
)

parser.add_argument(
"-rn",
"--repository_name",
action="store",
dest="repository_name",
default=None,
help="Name of the snapshot repository",
)

parser.add_argument(
"-mp",
"--use_multiprocessing",
Expand Down Expand Up @@ -249,6 +277,14 @@ def handle(self, *args, **options):
max_subprocesses=options["max_subprocesses"],
recalculate_descriptors=options["recalculate_descriptors"],
)
if options["operation"] == "restore_snapshot":
self.restore_snapshot(options["repository_name"], options["snapshot_name"])
if options["operation"] == "create_snapshot":
self.create_snapshot(options["repository_name"], options["snapshot_name"])
if options["operation"] == "list_snapshots":
self.list_snapshots(options["repository_name"])
if options["operation"] == "snapshot_restore_status":
self.snapshot_restore_status()

def register_index(self, name):
es_index = get_index(name)
Expand Down Expand Up @@ -320,6 +356,38 @@ def setup_indexes(self, name=None):
else:
self.register_index(name)

def create_snapshot(self, repository_name=None, snapshot_name=None):
if not repository_name:
print("Repaository name is required. Use -rn or --repository_name")
else:
SearchEngineFactory().create().create_snapshot(
repository_name, snapshot_name
)

def restore_snapshot(self, repository_name=None, snapshot_name=None):
if not repository_name or not snapshot_name:
print(
"Snapshot name and repository name are required. Use -sn or --snapshot_name and -rn or --repository_name"
)
else:
if (
SearchEngineFactory()
.create()
.check_snapshot(repository_name, snapshot_name)
):
self.delete_indexes()
SearchEngineFactory().create().restore_snapshot(
repository_name, snapshot_name
)
else:
print("Snapshot does not exist")

def snapshot_restore_status(self):
print(SearchEngineFactory().create().restore_status())

def list_snapshots(self, repository_name):
print(SearchEngineFactory().create().list_snapshots(repository_name))

def delete_indexes(self, name=None):
if name is None:
delete_terms_index()
Expand Down
Loading