Page Menu
Home
Search
Configure Global Search
Log In
Files
F13395296
elastic.py
No One
Temporary
Actions
Download File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Size
4 KB
Subscribers
None
elastic.py
View Options
import
concurrent.futures
import
logging
import
typing
import
bson
from
flask_script
import
Manager
from
pillar
import
current_app
from
pillar.api.search
import
index
log
=
logging
.
getLogger
(
__name__
)
manager_elastic
=
Manager
(
current_app
,
usage
=
"Elastic utilities"
)
name_to_task
=
{
'nodes'
:
index
.
ResetNodeIndex
,
'users'
:
index
.
ResetUserIndex
,
}
REINDEX_THREAD_COUNT
=
5
@manager_elastic.option
(
'indices'
,
nargs
=
'*'
)
def
reset_index
(
indices
:
typing
.
List
[
str
]):
"""
Destroy and recreate elastic indices
nodes, users
"""
with
current_app
.
app_context
():
if
not
indices
:
indices
=
name_to_task
.
keys
()
for
elk_index
in
indices
:
try
:
task
=
name_to_task
[
elk_index
]()
except
KeyError
:
raise
SystemError
(
'Unknown elk_index, choose from
%s
'
%
(
', '
.
join
(
name_to_task
.
keys
())))
task
.
execute
()
def
_reindex_users
():
db
=
current_app
.
db
()
users_coll
=
db
[
'users'
]
# Note that this also finds service accounts, which are filtered out
# in prepare_user_data(…)
users
=
users_coll
.
find
()
user_count
=
users
.
count
()
indexed
=
0
log
.
info
(
'Reindexing
%d
users in Elastic'
,
user_count
)
from
pillar.celery.search_index_tasks
import
prepare_user_data
from
pillar.api.search
import
elastic_indexing
app
=
current_app
.
real_app
def
do_work
(
work_idx_user
):
nonlocal
indexed
idx
,
user
=
work_idx_user
with
app
.
app_context
():
if
idx
%
100
==
0
:
log
.
info
(
'Processing user
%d
/
%d
'
,
idx
+
1
,
user_count
)
to_index
=
prepare_user_data
(
''
,
user
=
user
)
if
not
to_index
:
log
.
debug
(
'not indexing user
%s
'
,
user
)
return
try
:
elastic_indexing
.
push_updated_user
(
to_index
)
except
(
KeyError
,
AttributeError
):
log
.
exception
(
'Field is missing for
%s
'
,
user
)
else
:
indexed
+=
1
with
concurrent
.
futures
.
ThreadPoolExecutor
(
max_workers
=
REINDEX_THREAD_COUNT
)
as
executor
:
result
=
executor
.
map
(
do_work
,
enumerate
(
users
))
# When an exception occurs, it's enough to just iterate over the results.
# That will re-raise the exception in the main thread.
for
ob
in
result
:
log
.
debug
(
'result:
%s
'
,
ob
)
log
.
info
(
'Reindexed
%d
/
%d
users'
,
indexed
,
user_count
)
def
_public_project_ids
()
->
typing
.
List
[
bson
.
ObjectId
]:
"""Returns a list of ObjectIDs of public projects.
Memoized in setup_app().
"""
proj_coll
=
current_app
.
db
(
'projects'
)
result
=
proj_coll
.
find
({
'is_private'
:
False
},
{
'_id'
:
1
})
return
[
p
[
'_id'
]
for
p
in
result
]
def
_reindex_nodes
():
db
=
current_app
.
db
()
nodes_coll
=
db
[
'nodes'
]
nodes
=
nodes_coll
.
find
({
'project'
:
{
'$in'
:
_public_project_ids
()},
'_deleted'
:
{
'$ne'
:
True
},
})
node_count
=
nodes
.
count
()
indexed
=
0
log
.
info
(
'Nodes
%d
will be reindexed in Elastic'
,
node_count
)
app
=
current_app
.
real_app
from
pillar.celery.search_index_tasks
import
prepare_node_data
from
pillar.api.search
import
elastic_indexing
def
do_work
(
work_idx_node
):
nonlocal
indexed
idx
,
node
=
work_idx_node
with
app
.
app_context
():
if
idx
%
100
==
0
:
log
.
info
(
'Processing node
%d
/
%d
'
,
idx
+
1
,
node_count
)
try
:
to_index
=
prepare_node_data
(
''
,
node
=
node
)
elastic_indexing
.
index_node_save
(
to_index
)
except
(
KeyError
,
AttributeError
):
log
.
exception
(
'Node
%s
is missing a field'
,
node
)
else
:
indexed
+=
1
with
concurrent
.
futures
.
ThreadPoolExecutor
(
max_workers
=
REINDEX_THREAD_COUNT
)
as
executor
:
result
=
executor
.
map
(
do_work
,
enumerate
(
nodes
))
# When an exception occurs, it's enough to just iterate over the results.
# That will re-raise the exception in the main thread.
for
ob
in
result
:
log
.
debug
(
'result:
%s
'
,
ob
)
log
.
info
(
'Reindexed
%d
/
%d
nodes'
,
indexed
,
node_count
)
@manager_elastic.option
(
'indexname'
,
nargs
=
'?'
)
@manager_elastic.option
(
'-r'
,
'--reset'
,
default
=
False
,
action
=
'store_true'
)
def
reindex
(
indexname
=
''
,
reset
=
False
):
import
time
import
datetime
start
=
time
.
time
()
if
reset
:
log
.
info
(
'Resetting first'
)
reset_index
([
indexname
]
if
indexname
else
[])
if
not
indexname
:
log
.
info
(
'reindex everything..'
)
_reindex_nodes
()
_reindex_users
()
elif
indexname
==
'users'
:
log
.
info
(
'Indexing
%s
'
,
indexname
)
_reindex_users
()
elif
indexname
==
'nodes'
:
log
.
info
(
'Indexing
%s
'
,
indexname
)
_reindex_nodes
()
duration
=
time
.
time
()
-
start
log
.
info
(
'Reindexing took
%s
'
,
datetime
.
timedelta
(
seconds
=
duration
))
File Metadata
Details
Attached
Mime Type
text/x-python
Expires
Thu, Aug 18, 11:36 PM (2 d)
Storage Engine
local-disk
Storage Format
Raw Data
Storage Handle
7b/41/c53744d29801a14177ebc7fa049b
Attached To
rPS Pillar
Event Timeline
Log In to Comment