9
9
10
10
import biothings_client
11
11
12
- from .exceptions import InvalidCurieError , TRAPIInputError
13
- from .settings import ANNOTATOR_CLIENTS , SERVICE_PROVIDER_API_HOST
14
- from . transformer import ResponseTransformer
15
- from .utils import batched , get_client , get_dotfield_value , group_by_subfield , parse_curie
12
+ from biothings_annotator . annotator .exceptions import InvalidCurieError , TRAPIInputError
13
+ from biothings_annotator . annotator .settings import ANNOTATOR_CLIENTS , SERVICE_PROVIDER_API_HOST
14
+ from biothings_annotator . annotator . transformer import ResponseTransformer , load_atc_cache
15
+ from biothings_annotator . annotator .utils import batched , get_client , get_dotfield_value , group_by_subfield , parse_curie
16
16
17
17
logger = logging .getLogger (__name__ )
18
18
19
19
20
20
class Annotator :
21
-
22
21
def __init__ (self ):
23
22
self .api_host = os .environ .get ("SERVICE_PROVIDER_API_HOST" , SERVICE_PROVIDER_API_HOST )
24
23
25
- def query_biothings (
24
+ async def query_biothings (
26
25
self , node_type : str , query_list : List [str ], fields : Optional [Union [str , List [str ]]] = None
27
26
) -> Dict :
28
27
"""
29
28
Query biothings client based on node_type for a list of ids
30
29
"""
31
30
client = get_client (node_type , self .api_host )
32
- if not isinstance (client , biothings_client .BiothingClient ):
31
+ if not isinstance (client , biothings_client .AsyncBiothingClient ):
33
32
logger .error ("Failed to get the biothings client for %s type. This type is skipped." , node_type )
34
33
return {}
35
34
36
35
fields = fields or ANNOTATOR_CLIENTS [node_type ]["fields" ]
37
36
scopes = ANNOTATOR_CLIENTS [node_type ]["scopes" ]
38
37
logger .info ("Querying annotations for %s %ss..." , len (query_list ), node_type )
39
- res = client .querymany (query_list , scopes = scopes , fields = fields )
38
+ res = await client .querymany (query_list , scopes = scopes , fields = fields )
40
39
logger .info ("Done. %s annotation objects returned." , len (res ))
41
40
grouped_response = group_by_subfield (collection = res , search_key = "query" )
42
41
return grouped_response
43
42
44
- def transform (self , res_by_id , node_type ):
43
+ async def transform (self , res_by_id : Dict , node_type : str ):
45
44
"""
46
45
perform any transformation on the annotation object, but in-place also returned object
47
46
res_by_id is the output of query_biothings, node_type is the same passed to query_biothings
48
47
"""
49
48
logger .info ("Transforming output annotations for %s %ss..." , len (res_by_id ), node_type )
50
- transformer = ResponseTransformer (res_by_id , node_type )
49
+ atc_cache = await load_atc_cache (self .api_host )
50
+ transformer = ResponseTransformer (res_by_id , node_type , self .api_host , atc_cache )
51
51
transformer .transform ()
52
52
logger .info ("Done." )
53
53
return res_by_id
54
54
55
- def append_extra_annotations (self , node_d : Dict , node_id_subset : Optional [List [str ]] = None , batch_n : int = 1000 ):
55
+ async def append_extra_annotations (
56
+ self , node_d : Dict , node_id_subset : Optional [List [str ]] = None , batch_n : int = 1000
57
+ ):
56
58
"""
57
59
Append extra annotations to the existing node_d
58
60
"""
@@ -61,7 +63,7 @@ def append_extra_annotations(self, node_d: Dict, node_id_subset: Optional[List[s
61
63
cnt = 0
62
64
logger .info ("Retrieving extra annotations..." )
63
65
for node_id_batch in batched (node_id_list , batch_n ):
64
- extra_res = extra_api .querymany (node_id_batch , scopes = "_id" , fields = "all" )
66
+ extra_res = await extra_api .querymany (node_id_batch , scopes = "_id" , fields = "all" )
65
67
for hit in extra_res :
66
68
if hit .get ("notfound" , False ):
67
69
continue
@@ -83,7 +85,7 @@ def append_extra_annotations(self, node_d: Dict, node_id_subset: Optional[List[s
83
85
cnt += 1
84
86
logger .info ("Done. %s extra annotations appended." , cnt )
85
87
86
- def annotate_curie (
88
+ async def annotate_curie (
87
89
self , curie : str , raw : bool = False , fields : Optional [Union [str , List [str ]]] = None , include_extra : bool = True
88
90
) -> Dict :
89
91
"""
@@ -92,18 +94,21 @@ def annotate_curie(
92
94
node_type , _id = parse_curie (curie )
93
95
if not node_type :
94
96
raise InvalidCurieError (curie )
95
- res = self .query_biothings (node_type , [_id ], fields = fields )
97
+ res = await self .query_biothings (node_type , [_id ], fields = fields )
96
98
if not raw :
97
- res = self .transform (res , node_type )
99
+ res = await self .transform (res , node_type )
98
100
# res = [self.transform(r) for r in res[_id]]
99
101
if res and include_extra :
100
- self .append_extra_annotations (res )
101
- return {curie : res .get (_id , {})}
102
+ await self .append_extra_annotations (res )
103
+
104
+ curie_annotation = {curie : res .get (_id , {})}
105
+ return curie_annotation
102
106
103
- def _annotate_node_list_by_type (
107
+ async def _annotate_node_list_by_type (
104
108
self , node_list_by_type : Dict , raw : bool = False , fields : Optional [Union [str , List [str ]]] = None
105
109
) -> Iterable [tuple ]:
106
- """This is a helper method re-used in both annotate_curie_list and annotate_trapi methods
110
+ """
111
+ This is a helper method re-used in both annotate_curie_list and annotate_trapi methods
107
112
It returns a generator of tuples of (original_node_id, annotation_object) for each node_id,
108
113
passed via node_list_by_type.
109
114
"""
@@ -117,11 +122,12 @@ def _annotate_node_list_by_type(
117
122
118
123
# this is the list of query ids like 1017
119
124
query_list = [parse_curie (_id , return_type = False , return_id = True ) for _id in node_list_by_type [node_type ]]
125
+
120
126
# query_id to original id mapping
121
127
node_id_d = dict (zip (query_list , node_list ))
122
- res_by_id = self .query_biothings (node_type , query_list , fields = fields )
128
+ res_by_id = await self .query_biothings (node_type , query_list , fields = fields )
123
129
if not raw :
124
- res_by_id = self .transform (res_by_id , node_type )
130
+ res_by_id = await self .transform (res_by_id , node_type )
125
131
126
132
# map back to original node ids
127
133
# NOTE: we don't want to use `for node_id in res_by_id:` here, since we will mofiify res_by_id in the loop
@@ -131,7 +137,7 @@ def _annotate_node_list_by_type(
131
137
res_by_id [orig_node_id ] = res_by_id .pop (node_id )
132
138
yield (orig_node_id , res_by_id [orig_node_id ])
133
139
134
- def annotate_curie_list (
140
+ async def annotate_curie_list (
135
141
self ,
136
142
curie_list : Union [List [str ], Iterable [str ]],
137
143
raw : bool = False ,
@@ -154,14 +160,15 @@ def annotate_curie_list(
154
160
else :
155
161
logger .warning ("Unsupported Curie prefix: %s. Skipped!" , node_id )
156
162
157
- for node_id , res in self ._annotate_node_list_by_type (node_list_by_type , raw = raw , fields = fields ):
163
+ async for node_id , res in self ._annotate_node_list_by_type (node_list_by_type , raw = raw , fields = fields ):
158
164
node_d [node_id ] = res
165
+
159
166
if include_extra :
160
167
# currently, we only need to append extra annotations for chem nodes
161
168
self .append_extra_annotations (node_d , node_id_subset = node_list_by_type .get ("chem" , []))
162
169
return node_d
163
170
164
- def annotate_trapi (
171
+ async def annotate_trapi (
165
172
self ,
166
173
trapi_input : Dict ,
167
174
append : bool = False ,
@@ -176,8 +183,8 @@ def annotate_trapi(
176
183
try :
177
184
node_d = get_dotfield_value ("message.knowledge_graph.nodes" , trapi_input )
178
185
assert isinstance (node_d , dict )
179
- except (KeyError , ValueError , AssertionError ):
180
- raise TRAPIInputError (trapi_input )
186
+ except (KeyError , ValueError , AssertionError ) as access_error :
187
+ raise TRAPIInputError (trapi_input ) from access_error
181
188
182
189
# if limit is set, we truncate the node_d to that size
183
190
if limit :
@@ -203,7 +210,7 @@ def annotate_trapi(
203
210
logger .warning ("Unsupported Curie prefix: %s. Skipped!" , node_id )
204
211
205
212
_node_d = {}
206
- for node_id , res in self ._annotate_node_list_by_type (node_list_by_type , raw = raw , fields = fields ):
213
+ async for node_id , res in self ._annotate_node_list_by_type (node_list_by_type , raw = raw , fields = fields ):
207
214
_node_d [node_id ] = res
208
215
209
216
if include_extra :
0 commit comments