Skip to content

elasticsearch

This module provides a connector to an Elasticsearch instance to store and retrieve FAIR-DOs.

ElasticsearchConnector

This class provides a connector to an Elasticsearch instance to store and retrieve FAIR-DOs.

Attributes:

Name Type Description
url str

The URL of the Elasticsearch instance

apikey str

The API key to access the Elasticsearch instance

indexName str

The name of the index to use

Source code in src/nmr_FAIR_DOs/connectors/elasticsearch.py
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
class ElasticsearchConnector:
    """
    This class provides a connector to an Elasticsearch instance to store and retrieve FAIR-DOs.

    Attributes:
        url (str): The URL of the Elasticsearch instance
        apikey (str): The API key to access the Elasticsearch instance
        indexName (str): The name of the index to use
    """

    def __init__(self, url: str, apikey: str, indexName: str):
        """
        Creates an Elasticsearch connector

        Args:
            url (str): The URL of the Elasticsearch instance
            apikey (str): The API key to access the Elasticsearch instance
            indexName (str): The name of the index to use
        """

        if not url or url == "":  # if the URL is None, raise an error
            raise ValueError("URL must not be None or empty")
        if (
            not indexName or indexName == ""
        ):  # if the index name is None, raise an error
            raise ValueError("Index name must not be None or empty")

        self._url = url
        self._apikey = apikey
        self._indexName = indexName

        self._client = Elasticsearch(
            hosts=self._url, api_key=self._apikey
        )  # create the Elasticsearch client

        logger.info(f"Connected to Elasticsearch: {self._client.info()}")

        # Check if the client is connected
        if not self._client.ping():
            raise Exception("Could not connect to Elasticsearch")

        # Create the index if it does not exist
        if self._client.indices.exists(index=indexName):
            logger.info("Index " + indexName + " already exists")
        else:  # if the index does not exist, create it
            self._client.indices.create(index=indexName)
            logger.info("Created index " + indexName)

    async def addPIDRecord(self, pidRecord: PIDRecord):
        """
        Adds a PID record to the Elasticsearch index.

        Args:
            pidRecord (PIDRecord): The PID record to add to the Elasticsearch index.
        """
        result = await _generate_elastic_JSON_from_PIDRecord(
            pidRecord
        )  # generate the JSON object from the PID record

        response = self._client.index(
            index=self._indexName, id=result["pid"], document=result
        )  # store the JSON object in the Elasticsearch index

        if response.meta.status not in [
            200,
            201,
        ]:  # if the response status is not 200 or 201, log an error
            logger.error(
                "Error storing FAIR-DO in elasticsearch index: " + result["pid"],
                result,
                response,
            )

        logger.info(
            "Stored FAIR-DO in elasticsearch index: " + result["pid"], result, response
        )

    async def addPIDRecords(self, pidRecords: list[PIDRecord]):
        """
        Adds a list of PID records to the Elasticsearch index.
        This method uses the bulk API of Elasticsearch to store the PID records more efficiently.

        Args:
            pidRecords (list[PIDRecord]): The list of PID records to add to the Elasticsearch index.

        Raises:
            Exception: If the response status is not 200 or 201.
        """
        # Generate the JSON objects from the PID records
        actions = [
            {
                "_op_type": "create",
                "_index": self._indexName,
                "_id": pidRecord.getPID(),
                "_source": await _generate_elastic_JSON_from_PIDRecord(pidRecord),
                # generate the JSON object from the PID record
            }
            for pidRecord in pidRecords  # iterate over the PID records
        ]

        response = bulk(
            self._client, actions
        )  # store the JSON objects in the Elasticsearch index
        logger.debug(
            "Elasticsearch response for bulk insert of PID records: ", response
        )

    def searchForPID(self, presumedPID: str) -> str:
        """
        Searches for a PID in the Elasticsearch index.
        If a record with the PID or the digitalObjectLocation equal to the presumed PID is found, the PID is returned.

        Args:
            presumedPID (str): The PID to search for.

        Returns:
            str: The PID of the found record.

        Raises:
            Exception: If the response status is not 200.
            Exception: If no record with the PID or the digitalObjectLocation equal to the presumed PID is found.
            Exception: If the PID of the found record does not match the presumed PID.
        """
        response = self._client.search(  # search for the PID in the Elasticsearch index
            index=self._indexName,
            body={
                "query": {
                    "multi_match": {
                        "type": "best_fields",
                        "query": presumedPID,  # search for the presumed PID
                        "fields": ["digitalObjectLocation", "pid"],
                        # search in the digitalObjectLocation and the pid fields
                    }
                }
            },
        )

        logger.debug(
            "Elasticsearch response for search query: " + presumedPID, response
        )

        if (
            response.meta.status != 200
        ):  # if the response status is not 200, log an error and raise an exception
            logger.error(
                "Error retrieving FAIR-DO from elasticsearch index: " + presumedPID,
                response,
            )
            raise Exception(
                "Error retrieving FAIR-DO from elasticsearch index: " + presumedPID,
                response,
            )

        result = (  # get the result from the response
            response["hits"]["hits"][0][
                "_source"
            ]  # get the source of the first hit in the response if it exists
            if response["hits"]["total"]["value"] > 0
            else None  # if no hit exists, set the result to None
        )

        if result is None:  # if no result is found, log an error and raise an exception
            logger.warning(
                "No FAIR-DO found in elasticsearch index: " + presumedPID, response
            )
            raise Exception(
                "No FAIR-DO found in elasticsearch index: " + presumedPID, response
            )
        elif (
            result["pid"] != presumedPID
            and result["digitalObjectLocation"] != presumedPID
        ):  # if the PID of the found record does not match the presumed PID, log an error and raise an exception
            logger.warning(
                "PID of retrieved FAIR-DO does not match requested PID: " + presumedPID,
                result,
            )
            raise Exception(
                "PID of retrieved FAIR-DO does not match requested PID: " + presumedPID,
                result,
            )

        pid = result["pid"]  # get the PID from the result
        logger.info(
            "Retrieved possible FAIRDO from elasticsearch index: " + pid, result
        )

        return pid  # return the PID

__init__

__init__(url: str, apikey: str, indexName: str)

Creates an Elasticsearch connector

Parameters:

Name Type Description Default
url str

The URL of the Elasticsearch instance

required
apikey str

The API key to access the Elasticsearch instance

required
indexName str

The name of the index to use

required
Source code in src/nmr_FAIR_DOs/connectors/elasticsearch.py
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
def __init__(self, url: str, apikey: str, indexName: str):
    """
    Creates an Elasticsearch connector

    Args:
        url (str): The URL of the Elasticsearch instance
        apikey (str): The API key to access the Elasticsearch instance
        indexName (str): The name of the index to use
    """

    if not url or url == "":  # if the URL is None, raise an error
        raise ValueError("URL must not be None or empty")
    if (
        not indexName or indexName == ""
    ):  # if the index name is None, raise an error
        raise ValueError("Index name must not be None or empty")

    self._url = url
    self._apikey = apikey
    self._indexName = indexName

    self._client = Elasticsearch(
        hosts=self._url, api_key=self._apikey
    )  # create the Elasticsearch client

    logger.info(f"Connected to Elasticsearch: {self._client.info()}")

    # Check if the client is connected
    if not self._client.ping():
        raise Exception("Could not connect to Elasticsearch")

    # Create the index if it does not exist
    if self._client.indices.exists(index=indexName):
        logger.info("Index " + indexName + " already exists")
    else:  # if the index does not exist, create it
        self._client.indices.create(index=indexName)
        logger.info("Created index " + indexName)

addPIDRecord async

addPIDRecord(pidRecord: PIDRecord)

Adds a PID record to the Elasticsearch index.

Parameters:

Name Type Description Default
pidRecord PIDRecord

The PID record to add to the Elasticsearch index.

required
Source code in src/nmr_FAIR_DOs/connectors/elasticsearch.py
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
async def addPIDRecord(self, pidRecord: PIDRecord):
    """
    Adds a PID record to the Elasticsearch index.

    Args:
        pidRecord (PIDRecord): The PID record to add to the Elasticsearch index.
    """
    result = await _generate_elastic_JSON_from_PIDRecord(
        pidRecord
    )  # generate the JSON object from the PID record

    response = self._client.index(
        index=self._indexName, id=result["pid"], document=result
    )  # store the JSON object in the Elasticsearch index

    if response.meta.status not in [
        200,
        201,
    ]:  # if the response status is not 200 or 201, log an error
        logger.error(
            "Error storing FAIR-DO in elasticsearch index: " + result["pid"],
            result,
            response,
        )

    logger.info(
        "Stored FAIR-DO in elasticsearch index: " + result["pid"], result, response
    )

addPIDRecords async

addPIDRecords(pidRecords: list[PIDRecord])

Adds a list of PID records to the Elasticsearch index. This method uses the bulk API of Elasticsearch to store the PID records more efficiently.

Parameters:

Name Type Description Default
pidRecords list[PIDRecord]

The list of PID records to add to the Elasticsearch index.

required

Raises:

Type Description
Exception

If the response status is not 200 or 201.

Source code in src/nmr_FAIR_DOs/connectors/elasticsearch.py
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
async def addPIDRecords(self, pidRecords: list[PIDRecord]):
    """
    Adds a list of PID records to the Elasticsearch index.
    This method uses the bulk API of Elasticsearch to store the PID records more efficiently.

    Args:
        pidRecords (list[PIDRecord]): The list of PID records to add to the Elasticsearch index.

    Raises:
        Exception: If the response status is not 200 or 201.
    """
    # Generate the JSON objects from the PID records
    actions = [
        {
            "_op_type": "create",
            "_index": self._indexName,
            "_id": pidRecord.getPID(),
            "_source": await _generate_elastic_JSON_from_PIDRecord(pidRecord),
            # generate the JSON object from the PID record
        }
        for pidRecord in pidRecords  # iterate over the PID records
    ]

    response = bulk(
        self._client, actions
    )  # store the JSON objects in the Elasticsearch index
    logger.debug(
        "Elasticsearch response for bulk insert of PID records: ", response
    )

searchForPID

searchForPID(presumedPID: str) -> str

Searches for a PID in the Elasticsearch index. If a record with the PID or the digitalObjectLocation equal to the presumed PID is found, the PID is returned.

Parameters:

Name Type Description Default
presumedPID str

The PID to search for.

required

Returns:

Name Type Description
str str

The PID of the found record.

Raises:

Type Description
Exception

If the response status is not 200.

Exception

If no record with the PID or the digitalObjectLocation equal to the presumed PID is found.

Exception

If the PID of the found record does not match the presumed PID.

Source code in src/nmr_FAIR_DOs/connectors/elasticsearch.py
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
def searchForPID(self, presumedPID: str) -> str:
    """
    Searches for a PID in the Elasticsearch index.
    If a record with the PID or the digitalObjectLocation equal to the presumed PID is found, the PID is returned.

    Args:
        presumedPID (str): The PID to search for.

    Returns:
        str: The PID of the found record.

    Raises:
        Exception: If the response status is not 200.
        Exception: If no record with the PID or the digitalObjectLocation equal to the presumed PID is found.
        Exception: If the PID of the found record does not match the presumed PID.
    """
    response = self._client.search(  # search for the PID in the Elasticsearch index
        index=self._indexName,
        body={
            "query": {
                "multi_match": {
                    "type": "best_fields",
                    "query": presumedPID,  # search for the presumed PID
                    "fields": ["digitalObjectLocation", "pid"],
                    # search in the digitalObjectLocation and the pid fields
                }
            }
        },
    )

    logger.debug(
        "Elasticsearch response for search query: " + presumedPID, response
    )

    if (
        response.meta.status != 200
    ):  # if the response status is not 200, log an error and raise an exception
        logger.error(
            "Error retrieving FAIR-DO from elasticsearch index: " + presumedPID,
            response,
        )
        raise Exception(
            "Error retrieving FAIR-DO from elasticsearch index: " + presumedPID,
            response,
        )

    result = (  # get the result from the response
        response["hits"]["hits"][0][
            "_source"
        ]  # get the source of the first hit in the response if it exists
        if response["hits"]["total"]["value"] > 0
        else None  # if no hit exists, set the result to None
    )

    if result is None:  # if no result is found, log an error and raise an exception
        logger.warning(
            "No FAIR-DO found in elasticsearch index: " + presumedPID, response
        )
        raise Exception(
            "No FAIR-DO found in elasticsearch index: " + presumedPID, response
        )
    elif (
        result["pid"] != presumedPID
        and result["digitalObjectLocation"] != presumedPID
    ):  # if the PID of the found record does not match the presumed PID, log an error and raise an exception
        logger.warning(
            "PID of retrieved FAIR-DO does not match requested PID: " + presumedPID,
            result,
        )
        raise Exception(
            "PID of retrieved FAIR-DO does not match requested PID: " + presumedPID,
            result,
        )

    pid = result["pid"]  # get the PID from the result
    logger.info(
        "Retrieved possible FAIRDO from elasticsearch index: " + pid, result
    )

    return pid  # return the PID