Skip to content

utils

This module provides utility functions for the NMR FAIR DOs project.

fetch_data async

fetch_data(url: str, forceFresh: bool = False) -> dict

Fetches data from the specified URL. The data is cached in the CACHE_DIR. If the data is already cached, it is used instead of fetching fresh data.

Parameters:

Name Type Description Default
url str

The URL to fetch data from

required
forceFresh bool

Whether to force fetching fresh data. This tells the function to ignore cached data.

False

Returns:

Name Type Description
dict dict

The fetched data

Raises:

Type Description
ValueError

If the URL is invalid or the data cannot be fetched

Source code in src/nmr_FAIR_DOs/utils.py
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
async def fetch_data(url: str, forceFresh: bool = False) -> dict:
    """
    Fetches data from the specified URL.
    The data is cached in the CACHE_DIR.
    If the data is already cached, it is used instead of fetching fresh data.

    Args:
        url (str): The URL to fetch data from
        forceFresh (bool): Whether to force fetching fresh data. This tells the function to ignore cached data.

    Returns:
        dict: The fetched data

    Raises:
        ValueError: If the URL is invalid or the data cannot be fetched
    """
    if not url or url is None or not isinstance(url, str):
        raise ValueError("Invalid URL")

    filename = CACHE_DIR + "/" + url.replace("/", "_") + ".json"

    # check if data is cached
    if os.path.isfile(filename) and not forceFresh:
        with open(filename, "r") as f:  # load from cache
            result = json.load(f)  # get JSON
            if result is not None and isinstance(
                result, dict
            ):  # check if JSON is valid
                logger.info(f"Using cached data for {url}")
                return result  # return cached data

    try:
        logger.debug(f"Fetching {url}")
        async with aiohttp.ClientSession() as session:  # create a new session
            async with session.get(url) as response:  # fetch data
                if response.status == 200:  # check if the response is OK
                    with open(filename, "w") as c:  # save to cache
                        json.dump(await response.json(), c)
                    return await response.json()  # return fetched data
                else:  # if the response is not OK raise an error
                    logger.error(f"Failed to fetch {url}: {response.status}", response)
                    raise ValueError(
                        f"Failed to fetch {url}: {response.status}",
                        response,
                        datetime.now().isoformat(),
                    )
    except Exception as e:  # if an error occurs raise an error
        print(f"Error fetching {url}: {str(e)}")
        raise ValueError(str(e), url, datetime.now().isoformat())

fetch_multiple async

fetch_multiple(
    urls: list[str], forceFresh: bool = False
) -> list[dict]

Fetches data from multiple URLs. This function is a wrapper around fetch_data that fetches data from multiple URLs concurrently.

Parameters:

Name Type Description Default
urls List[str]

A list of URLs to fetch data from

required
forceFresh bool

Whether to force fetching fresh data. This tells the function to ignore cached data.

False

Returns:

Type Description
list[dict]

List[dict]: A list of fetched data

Raises:

Type Description
ValueError

If the URLs are invalid or the data cannot be fetched

Source code in src/nmr_FAIR_DOs/utils.py
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
async def fetch_multiple(urls: list[str], forceFresh: bool = False) -> list[dict]:
    """
    Fetches data from multiple URLs.
    This function is a wrapper around fetch_data that fetches data from multiple URLs concurrently.

    Args:
        urls (List[str]): A list of URLs to fetch data from
        forceFresh (bool): Whether to force fetching fresh data. This tells the function to ignore cached data.

    Returns:
        List[dict]: A list of fetched data

    Raises:
        ValueError: If the URLs are invalid or the data cannot be fetched
    """
    if not urls or urls is None or not isinstance(urls, list):
        raise ValueError("Invalid URLs. Please provide a list of URLs.")

    num_concurrent_requests = 100  # number of concurrent requests
    connector = aiohttp.TCPConnector(
        limit=num_concurrent_requests
    )  # create a new connector
    async with aiohttp.ClientSession(connector=connector):
        results = []
        for i in range(
            0, len(urls), num_concurrent_requests
        ):  # iterate over the URLs in batches
            batch = urls[i : i + num_concurrent_requests]  # get the current batch
            tasks = [
                asyncio.create_task(fetch_data(url, forceFresh)) for url in batch
            ]  # create tasks for each URL in the batch
            results.extend(
                await asyncio.gather(*tasks)
            )  # close the tasks and add the results to the list
        return results

encodeInBase64

encodeInBase64(data: str) -> str

Encodes the given data in Base64.

Parameters:

Name Type Description Default
data str

The data to encode

required

Returns:

Name Type Description
str str

The Base64 encoded data

Raises:

Type Description
ValueError

If the data is None or empty

Source code in src/nmr_FAIR_DOs/utils.py
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
def encodeInBase64(data: str) -> str:
    """
    Encodes the given data in Base64.

    Args:
        data (str): The data to encode

    Returns:
        str: The Base64 encoded data

    Raises:
        ValueError: If the data is None or empty
    """
    if data is None or len(data) == 0:
        raise ValueError("Data must not be None or empty")

    result = base64.b64encode(bytes(data, "utf-8")).decode("utf-8")
    return result

decodeFromBase64

decodeFromBase64(data: str) -> str

Decodes the given Base64 encoded data.

Parameters:

Name Type Description Default
data str

The Base64 encoded data to decode

required

Returns:

Name Type Description
str str

The decoded data

Raises:

Type Description
ValueError

If the data is None or empty

Source code in src/nmr_FAIR_DOs/utils.py
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
def decodeFromBase64(data: str) -> str:
    """
    Decodes the given Base64 encoded data.

    Args:
        data (str): The Base64 encoded data to decode

    Returns:
        str: The decoded data

    Raises:
        ValueError: If the data is None or empty
    """
    if data is None or len(data) == 0:
        raise ValueError("Data must not be None or empty")

    result = base64.b64decode(data).decode("utf-8")
    return result

parseDateTime

parseDateTime(text: str) -> datetime

Parses a datetime from an arbitrary string.

Parameters:

Name Type Description Default
text str

The string to parse

required

Returns:

Name Type Description
datetime datetime

The parsed datetime

Raises:

Type Description
ValueError

If the text is None or empty or the datetime cannot be parsed

Source code in src/nmr_FAIR_DOs/utils.py
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
def parseDateTime(text: str) -> datetime:
    """
    Parses a datetime from an arbitrary string.

    Args:
        text (str): The string to parse

    Returns:
        datetime: The parsed datetime

    Raises:
        ValueError: If the text is None or empty or the datetime cannot be parsed
    """
    if text is None or len(text) == 0:  # check if the text is empty
        raise ValueError("Text must not be None or empty")

    try:
        return datetime.fromisoformat(text)
    except ValueError:
        pass

    try:
        return datetime.strptime(text, "%Y-%m-%d %H:%M:%S")
    except ValueError:
        pass

    try:
        return datetime.strptime(text, "%Y-%m-%d")
    except ValueError:
        pass

    try:
        return datetime.strptime(text, "%Y-%m-%dT%H:%M:%S")
    except ValueError:
        pass

    try:
        return datetime.strptime(text, "%Y-%m-%dT%H:%M:%S.%f")
    except ValueError:
        pass

    raise ValueError("Could not parse datetime from text " + text)

parseSPDXLicenseURL async

parseSPDXLicenseURL(input_str: str) -> str

This function takes a string input and searches for a matching SPDX license URL.

Parameters:

Name Type Description Default
input_str str

The input string to search for. This can be a license name, SPDX ID, URL, etc.

required

Returns:

Name Type Description
str str

The SPDX license URL

Source code in src/nmr_FAIR_DOs/utils.py
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
async def parseSPDXLicenseURL(input_str: str) -> str:
    """
    This function takes a string input and searches for a matching SPDX license URL.

    Args:
        input_str (str): The input string to search for. This can be a license name, SPDX ID, URL, etc.

    Returns:
        str: The SPDX license URL
    """
    spdx_base_url = "https://spdx.org/licenses"
    file_format = "json"

    if input_str in known_licenses:  # check if the input string is already known
        logger.debug(
            f"Using cached available_license URL for {input_str}: {known_licenses[input_str]}"
        )
        return known_licenses[input_str]

    # Fetch the list of licenses once
    available_licenses = await fetch_data(
        f"{spdx_base_url}/licenses.json"
    )  # fetch the list of licenses
    available_licenses = available_licenses["licenses"]

    for available_license in available_licenses:  # iterate over the licenses
        url = f"{spdx_base_url}/{available_license['licenseId']}.{file_format}"  # create the URL

        if (
            "reference" in available_license
            and input_str.lower() == available_license["reference"].lower()
        ):  # check if the input string is the reference (e.g. https://spdx.org/licenses/MIT.html)
            known_licenses[input_str] = url
            return url
        elif (
            "details" in available_license
            and input_str.lower() in available_license["details"].lower()
        ):  # check if the input string is in the details (e.g. https://spdx.org/licenses/MIT.json)
            known_licenses[input_str] = url
            return url
        elif (
            "licenseId" in available_license
            and input_str.lower() == available_license["licenseId"].lower()
        ):  # check if the input string is the available_license ID (e.g. MIT)
            known_licenses[input_str] = url
            return url
        elif (
            "seeAlso" in available_license
            and checkTextIsSimilar(input_str, available_license["seeAlso"])
        ):  # check if the input string is in the seeAlso list (e.g. [https://opensource.org/license/mit/])
            known_licenses[input_str] = url
            return url
        elif "name" in available_license and checkTextIsSimilar(
            input_str, available_license["name"]
        ):  # check if the input string is in the name (e.g. MIT License)
            known_licenses[input_str] = url
            return url
        elif "referenceNumber" in available_license and input_str == str(
            available_license["referenceNumber"]
        ):  # check if the input string is the reference number (e.g. 1)
            known_licenses[input_str] = url
            return url

    logger.warning(f"Could not parse available_license URL {input_str}")
    return input_str  # return the input string if no match was found

checkTextIsSimilar

checkTextIsSimilar(
    original: str, target: list[str] | str
) -> bool

Checks if the original text is similar to the target text.

Parameters:

Name Type Description Default
original str

The original text

required
target list[str] | str

The target text or a list of target texts

required

Returns:

Name Type Description
bool bool

Whether the original text is similar to the target text

Source code in src/nmr_FAIR_DOs/utils.py
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
def checkTextIsSimilar(original: str, target: list[str] | str) -> bool:
    """
    Checks if the original text is similar to the target text.

    Args:
        original (str): The original text
        target (list[str]|str): The target text or a list of target texts

    Returns:
        bool: Whether the original text is similar to the target text
    """
    if isinstance(target, str):
        target = [target]

    for t in target:
        # Remove case sensitivity
        original = original.lower()
        t = t.lower()

        # remove whitespaces and prefixes from URLs
        original = original.replace(" ", "")
        t = t.replace(" ", "")
        original = original.replace("https://", "")
        t = t.replace("https://", "")
        original = original.replace("http://", "")
        t = t.replace("http://", "")
        original = original.replace("www.", "")
        t = t.replace("www.", "")
        original = original.replace("legalcode", "")
        t = t.replace("legalcode", "")

        # remove file extensions
        original = original.replace(".json", "")
        t = t.replace(".json", "")
        original = original.replace(".html", "")
        t = t.replace(".html", "")
        original = original.replace(".txt", "")
        t = t.replace(".txt", "")
        original = original.replace(".md", "")
        t = t.replace(".md", "")
        original = original.replace(".xml", "")
        t = t.replace(".xml", "")
        original = original.replace(".rdf", "")
        t = t.replace(".rdf", "")

        # replace licenses with license to match SPDX URLs (e.g. https://opensource.org/licenses/MIT)
        original = original.replace("licenses", "license")
        t = t.replace("licenses", "license")

        # if there is a slash at the end of the URL, remove it
        if original.endswith("/"):
            original = original[:-1]
        if t.endswith("/"):
            t = t[:-1]

        if original == t:  # check if the strings are equal
            logger.debug(f"Found similar text: {original} == {t}")
            return True

    return False