Skip to content

DataStore

DataStore

Source code in blissdata/redis_engine/store.py
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
class DataStore:
    def __init__(
        self,
        url,
        identity_model_cls=ESRFIdentityModel,
        init_db=False,
    ):
        self._url = url
        self._redis = redis.Redis.from_url(url)

        # define a finalizer to close all pool's sockets upon DataStore garbage collection
        redis_pool = self._redis.connection_pool
        close_func = type(redis_pool).disconnect
        weakref.finalize(self, close_func, redis_pool)

        class StreamModel(HashModel):
            """Actual stream model to be embedded into ScanModel, but not exposed to user directly.
            They are wrapped into Stream objects instead."""

            class Meta:
                global_key_prefix = identity_model_cls._meta.global_key_prefix
                model_key_prefix = "stream"
                database = (
                    identity_model_cls._meta.database
                )  # TODO use _UninitializedRedis directly

            # Accepts 'str' to stay compatible with older blissdata versions.
            # Because default values were not defined, an empty string was
            # stored instead of None.
            encoding: Optional[Union[dict, str]] = None
            info: Optional[Union[dict, str]] = None

        class ScanModel(JsonModel):
            class Meta:
                global_key_prefix = identity_model_cls._meta.global_key_prefix
                model_key_prefix = "scan"
                database = self._redis

            id: identity_model_cls
            info: dict
            state_stream: StreamModel
            data_streams: dict[str, StreamModel]

        self._stream_model = StreamModel
        self._scan_model = ScanModel

        if init_db:
            self._setup_redis_db()
        else:
            # Check protocol version
            db_protocol = self._redis.get(_PROTOCOL_KEY)
            if db_protocol is None:
                raise RuntimeError(
                    f"No blissdata protocol version found on {url} Redis database, make sure it is initialized."
                )
            else:
                db_protocol = int(db_protocol.decode())

            if _PROTOCOL_VERSION != db_protocol:
                raise RuntimeError(
                    f"Found blissdata protocol {db_protocol} on {url} Redis database, but only version {_PROTOCOL_VERSION} is supported."
                )

    def _setup_redis_db(self):
        """Setup Redis database for blissdata needs. To prevent it is not call by accident,
        this function only accepts to execute on an empty database. Therefore, you need to
        flush the database first if you want to re-apply setup.

        setup steps:
            - Load data models on RediSearch to enable server-side indexing
            - Increase MAXPREFIXEXPANSIONS for RediSearch to not limit search query length
            - Upload stream sealing function to (required for atomic sealing)
            - Write protocol version into Redis for clients
        """
        # Ensure the database is empty
        nb_keys = self._redis.dbsize()
        if nb_keys:
            raise RuntimeError(
                f"Cannot re-index a non-empty Redis database, {nb_keys} keys are already present"
            )

        # Configure RediSearch indexing with redis-om, but block redis-om mechanism which auto-discover any
        # model class for indexing. Instead we set model_registry by hand to only enable self._scan_model indexing
        # and not to interfer with existing DataStores.
        from redis_om.model import model

        scan_mdl = self._scan_model
        model.model_registry = {
            f"{scan_mdl.__module__}.{scan_mdl.__qualname__}": scan_mdl
        }
        Migrator().run()

        # TODO how to ensure this is always greater than the number of scan in redis ?
        # otherwise search results may be truncated without warning
        self._redis.ft().config_set("MAXPREFIXEXPANSIONS", 2000)

        # Server side sealing function:
        # For the seal to contain the complete length of the stream, we
        # need to read the last entry first. Also, both commands should be
        # executed atomically.
        # Because command 2 depends on command 1, this can't be done within
        # a transaction, thus we need a server side function.
        stream_seal_func = """#!lua name=mylib
local function seal_stream(keys, args)
  local stream = keys[1]
  local seal = '18446744073709551615-18446744073709551615'
  local entry = redis.call('XREVRANGE', stream, '+', '-', 'COUNT', 1)[1]

  local id
  local value
  if entry == nil then
      id = -1
      value = {}
  elseif entry[1] == seal then
    error(keys[1].." is already sealed.")
  else
      id = tonumber(string.match(entry[1], "([^-]+)"))
      value = entry[2]
  end

  local len = 1
  for i, sub in ipairs(value) do
      if sub == 'len' then
          len = value[i+1]
      end
  end
  local sealing_id = len + id
  redis.call('XADD', stream, seal, 'id', sealing_id)
  return sealing_id
end

redis.register_function('seal_stream', seal_stream)"""
        self._redis.function_load(code=stream_seal_func, replace=True)

        self._redis.set(_PROTOCOL_KEY, _PROTOCOL_VERSION)

    def load_scan(self, key):
        return Scan._load(self, key)

    def create_scan(self, identity, info={}):
        return Scan._create(self, identity, info)

    def get_last_scan(self):
        """Find the latest scan created. If it was deleted, then the previous one is returned, etc.
        Raises NoScanAvailable if no scan exists.
        return: (timetag, scan_key)
        """
        max = "+"
        while True:
            raw = self._redis.xrevrange(scan_creation_stream, count=1, max=max)

            if not raw:
                raise NoScanAvailable
            else:
                timetag, data = raw[0]
                timetag = timetag.decode()
                key = data[b"key"].decode()

            ttl = self._redis.ttl(key)
            if ttl != -1 and ttl <= 10:
                # scan was deleted, skip that one
                a, b = [int(x) for x in timetag.split("-")]
                if b == 0:
                    max = f"{a - 1}-{2**64-1}"
                else:
                    max = f"{a}-{b - 1}"
                continue

            return timetag, key

    def get_last_scan_timetag(self) -> str:
        """Return the timetag of the last scan or '0-0' when there is no scan.
        Useful to save a starting point before telling publisher you're ready.
        """
        timetag = self._redis.xrevrange(scan_creation_stream, count=1)
        if timetag:
            return timetag[0][0].decode()
        else:
            return "0-0"

    def get_next_scan(self, since=None, block=True, timeout=0):
        """Blocking function waiting for the next scan, starting from now or a specific moment.
        since: Moment after which the next scan is awaited, using None means "from now".
            In order to iterate over new scans without missing any between subsequent calls, be sure to
            start from the previously returned timetag each time e.g:
            >    prev_timetag = None
            >    while True:
            >        prev_timetag, scan = get_next_scan(since=prev_timetag)
            >        # process scan
            Note that search functions also return a timetag which can be used to get the direct next
            scan created after search request, thus ensuring to not miss any scan in between.
        timeout: Given in seconds, zero means infinite timeout. Raises a NoScanAvailable exception when expiring.
        return: (timetag, scan_key)
        """
        if since is None:
            since = "$"

        if not block:
            timeout = None
        else:
            timeout = int(timeout * 1000)

        while True:
            if timeout not in [None, 0]:
                start = time.perf_counter()

            raw = self._redis.xread(
                {scan_creation_stream: since},
                block=timeout,
                count=1,
            )

            if timeout not in [None, 0]:
                stop = time.perf_counter()
                timeout -= int((stop - start) * 1000)
                if timeout <= 0:
                    timeout = None

            if not raw:
                raise NoScanAvailable
            else:
                timetag, data = raw[0][1][0]
                timetag = timetag.decode()
                key = data[b"key"].decode()

            ttl = self._redis.ttl(key)
            if ttl != -1 and ttl <= 10:
                # scan was deleted, skip that one
                since = timetag
                continue

            return timetag, key

    def _timestamped_pipeline(self, func):
        """Execute func(pipe) and get the last scan created timetag in an atomic way."""

        def catch_time_and_aggregate(pipe: redis.client.Pipeline) -> None:
            pipe.xrevrange(scan_creation_stream, count=1)
            func(pipe)

        timetag, raw_result = self._redis.transaction(catch_time_and_aggregate)

        if timetag:
            timetag = timetag[0][0].decode()
        else:
            timetag = "0-0"

        return timetag, raw_result

    def search_existing_scans(self, **kwargs):
        """Search for scans matching with ScanModel.id fields matching with those provided in kwargs.

        Example (assuming there is "name" and "dataset" fields in ScanModel.id):
            timetag, scan_keys = search_existing_scans(name="myscan", dataset="abc654")

        A wildcard can be used for prefix/infix/suffix matches in each field, for example:
            name="abcd*" OR name="*efgh*" OR name="*ijkl"
            But "abcd*ijkl" is forbidden as it is matching on both prefix and suffix.

        Return the following tuple:
            (<timetag>, [<scan key>, ...])
            Where timetag correspond to the last scan creation event at the time of the search.
            It can be used by get_next_scan(since=timetag) to wait for any scan posterior to that search.
        """
        # Escape all RedisSearch special characters except "*"
        escape_table = str.maketrans(
            {c: f"\\{c}" for c in ",.<>{}[]\"':;!@#$%^&()-+=~?/ "}
        )

        assert "since" not in kwargs
        if not kwargs:
            query_string = "*"
        else:
            query_items = {}
            for k, v in kwargs.items():
                if isinstance(v, Number):
                    query_items[f"id_{k}"] = f"[{v} {v}]"
                else:
                    query_items[f"id_{k}"] = tags(v.translate(escape_table))
            query_string = querystring(**query_items)

        max_count = 1000
        while True:
            query = Query(query_string).paging(0, max_count).no_content()
            timetag, raw_result = self._timestamped_search(query)
            count = raw_result[0]
            if count <= max_count:
                break
            else:
                # In the unlikely case more than max_count results are available,
                # increase max_count and retry.
                max_count = count

        return timetag, [key.decode() for key in raw_result[1:]]

    def _timestamped_search(self, query):
        def search(pipe: redis.client.Pipeline) -> None:
            pipe.ft("esrf:scan:index").search(query)

        return self._timestamped_pipeline(search)

    def _timestamped_aggregate(self, aggregate_request):
        def aggregate(pipe: redis.client.Pipeline) -> None:
            pipe.ft("esrf:scan:index").aggregate(aggregate_request)

        return self._timestamped_pipeline(aggregate)

    def get_scans_state(self, scan_keys: list[str]):
        """Return the current state of a list of scans from their keys in an efficient way.
        This is done by only requesting last state in the state stream of each scan, without
        syncing json content as Scan._load() would do.
        """
        if not scan_keys:
            return {}

        raw = self._redis.json().mget(scan_keys, "state_stream.pk")
        stream_keys = [self._stream_model.make_key(pk) for pk in raw]

        pipe = self._redis.pipeline(transaction=False)
        for stream_key in stream_keys:
            pipe.xrevrange(stream_key, count=1)
        raw = pipe.execute()
        scan_states = [ScanState(val[0][1][b"state"].decode()) for val in raw]

        return {key: state for key, state in zip(scan_keys, scan_states)}

    def delete_scan(self, scan_key, force=False):
        """Delete a scan which is already closed or raise a RuntimeError.
        If force is True, the scan will be closed first if not terminated.

        Raises:
            ScanNotFoundError: the scan doesn't exists (or it is already deleted).
            RuntimeError: the scan is not terminated and force is False.

        Note: Deletion actually sets a 10 seconds expiration time on the scan.
        Consequently, any scan with 10 seconds expiration or less is considered deleted.
        By doing this, we avoid race conditions where the scan would disappear in the middle
        of some transactions."""
        try:
            scan = Scan._load_rw(self, scan_key)
        except ScanNotFoundError:
            return

        if scan.state < ScanState.CLOSED:
            # Never delete a scan without eventually closing it, this allows
            # clients in a blocking scan.update() to leave.
            # Then, the key's time-to-live will prevent others to enter blocking calls.
            if force:
                # close() also seals any opened stream, making any reader to leave.
                scan.info["end_reason"] = "DELETION"
                scan.close()
            else:
                raise RuntimeError(
                    f"Scan {scan_key} is not terminated, use force=True to delete anyway."
                )

        def delete_scan_keys(pipe: redis.client.Pipeline) -> None:
            keys = [stream.key() for stream in scan._model.data_streams.values()]
            keys.append(scan._model.state_stream.key())
            keys.append(scan_key)
            for key in keys:
                # Set a small expiration time instead of deleting immediately.
                # This is to prevent race conditions. Running operations have time to terminate,
                # but new ones check for expiration time and will consider it deleted.
                pipe.expire(key, 10)

        self._redis.transaction(delete_scan_keys)

__init__(url, identity_model_cls=ESRFIdentityModel, init_db=False)

Source code in blissdata/redis_engine/store.py
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
def __init__(
    self,
    url,
    identity_model_cls=ESRFIdentityModel,
    init_db=False,
):
    self._url = url
    self._redis = redis.Redis.from_url(url)

    # define a finalizer to close all pool's sockets upon DataStore garbage collection
    redis_pool = self._redis.connection_pool
    close_func = type(redis_pool).disconnect
    weakref.finalize(self, close_func, redis_pool)

    class StreamModel(HashModel):
        """Actual stream model to be embedded into ScanModel, but not exposed to user directly.
        They are wrapped into Stream objects instead."""

        class Meta:
            global_key_prefix = identity_model_cls._meta.global_key_prefix
            model_key_prefix = "stream"
            database = (
                identity_model_cls._meta.database
            )  # TODO use _UninitializedRedis directly

        # Accepts 'str' to stay compatible with older blissdata versions.
        # Because default values were not defined, an empty string was
        # stored instead of None.
        encoding: Optional[Union[dict, str]] = None
        info: Optional[Union[dict, str]] = None

    class ScanModel(JsonModel):
        class Meta:
            global_key_prefix = identity_model_cls._meta.global_key_prefix
            model_key_prefix = "scan"
            database = self._redis

        id: identity_model_cls
        info: dict
        state_stream: StreamModel
        data_streams: dict[str, StreamModel]

    self._stream_model = StreamModel
    self._scan_model = ScanModel

    if init_db:
        self._setup_redis_db()
    else:
        # Check protocol version
        db_protocol = self._redis.get(_PROTOCOL_KEY)
        if db_protocol is None:
            raise RuntimeError(
                f"No blissdata protocol version found on {url} Redis database, make sure it is initialized."
            )
        else:
            db_protocol = int(db_protocol.decode())

        if _PROTOCOL_VERSION != db_protocol:
            raise RuntimeError(
                f"Found blissdata protocol {db_protocol} on {url} Redis database, but only version {_PROTOCOL_VERSION} is supported."
            )

delete_scan(scan_key, force=False)

Delete a scan which is already closed or raise a RuntimeError. If force is True, the scan will be closed first if not terminated.

Raises:

Type Description
ScanNotFoundError

the scan doesn't exists (or it is already deleted).

RuntimeError

the scan is not terminated and force is False.

Note: Deletion actually sets a 10 seconds expiration time on the scan. Consequently, any scan with 10 seconds expiration or less is considered deleted. By doing this, we avoid race conditions where the scan would disappear in the middle of some transactions.

Source code in blissdata/redis_engine/store.py
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
def delete_scan(self, scan_key, force=False):
    """Delete a scan which is already closed or raise a RuntimeError.
    If force is True, the scan will be closed first if not terminated.

    Raises:
        ScanNotFoundError: the scan doesn't exists (or it is already deleted).
        RuntimeError: the scan is not terminated and force is False.

    Note: Deletion actually sets a 10 seconds expiration time on the scan.
    Consequently, any scan with 10 seconds expiration or less is considered deleted.
    By doing this, we avoid race conditions where the scan would disappear in the middle
    of some transactions."""
    try:
        scan = Scan._load_rw(self, scan_key)
    except ScanNotFoundError:
        return

    if scan.state < ScanState.CLOSED:
        # Never delete a scan without eventually closing it, this allows
        # clients in a blocking scan.update() to leave.
        # Then, the key's time-to-live will prevent others to enter blocking calls.
        if force:
            # close() also seals any opened stream, making any reader to leave.
            scan.info["end_reason"] = "DELETION"
            scan.close()
        else:
            raise RuntimeError(
                f"Scan {scan_key} is not terminated, use force=True to delete anyway."
            )

    def delete_scan_keys(pipe: redis.client.Pipeline) -> None:
        keys = [stream.key() for stream in scan._model.data_streams.values()]
        keys.append(scan._model.state_stream.key())
        keys.append(scan_key)
        for key in keys:
            # Set a small expiration time instead of deleting immediately.
            # This is to prevent race conditions. Running operations have time to terminate,
            # but new ones check for expiration time and will consider it deleted.
            pipe.expire(key, 10)

    self._redis.transaction(delete_scan_keys)

get_last_scan()

Find the latest scan created. If it was deleted, then the previous one is returned, etc. Raises NoScanAvailable if no scan exists. return: (timetag, scan_key)

Source code in blissdata/redis_engine/store.py
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
def get_last_scan(self):
    """Find the latest scan created. If it was deleted, then the previous one is returned, etc.
    Raises NoScanAvailable if no scan exists.
    return: (timetag, scan_key)
    """
    max = "+"
    while True:
        raw = self._redis.xrevrange(scan_creation_stream, count=1, max=max)

        if not raw:
            raise NoScanAvailable
        else:
            timetag, data = raw[0]
            timetag = timetag.decode()
            key = data[b"key"].decode()

        ttl = self._redis.ttl(key)
        if ttl != -1 and ttl <= 10:
            # scan was deleted, skip that one
            a, b = [int(x) for x in timetag.split("-")]
            if b == 0:
                max = f"{a - 1}-{2**64-1}"
            else:
                max = f"{a}-{b - 1}"
            continue

        return timetag, key

get_last_scan_timetag()

Return the timetag of the last scan or '0-0' when there is no scan. Useful to save a starting point before telling publisher you're ready.

Source code in blissdata/redis_engine/store.py
202
203
204
205
206
207
208
209
210
def get_last_scan_timetag(self) -> str:
    """Return the timetag of the last scan or '0-0' when there is no scan.
    Useful to save a starting point before telling publisher you're ready.
    """
    timetag = self._redis.xrevrange(scan_creation_stream, count=1)
    if timetag:
        return timetag[0][0].decode()
    else:
        return "0-0"

get_next_scan(since=None, block=True, timeout=0)

Blocking function waiting for the next scan, starting from now or a specific moment. since: Moment after which the next scan is awaited, using None means "from now". In order to iterate over new scans without missing any between subsequent calls, be sure to start from the previously returned timetag each time e.g: > prev_timetag = None > while True: > prev_timetag, scan = get_next_scan(since=prev_timetag) > # process scan Note that search functions also return a timetag which can be used to get the direct next scan created after search request, thus ensuring to not miss any scan in between. timeout: Given in seconds, zero means infinite timeout. Raises a NoScanAvailable exception when expiring. return: (timetag, scan_key)

Source code in blissdata/redis_engine/store.py
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
def get_next_scan(self, since=None, block=True, timeout=0):
    """Blocking function waiting for the next scan, starting from now or a specific moment.
    since: Moment after which the next scan is awaited, using None means "from now".
        In order to iterate over new scans without missing any between subsequent calls, be sure to
        start from the previously returned timetag each time e.g:
        >    prev_timetag = None
        >    while True:
        >        prev_timetag, scan = get_next_scan(since=prev_timetag)
        >        # process scan
        Note that search functions also return a timetag which can be used to get the direct next
        scan created after search request, thus ensuring to not miss any scan in between.
    timeout: Given in seconds, zero means infinite timeout. Raises a NoScanAvailable exception when expiring.
    return: (timetag, scan_key)
    """
    if since is None:
        since = "$"

    if not block:
        timeout = None
    else:
        timeout = int(timeout * 1000)

    while True:
        if timeout not in [None, 0]:
            start = time.perf_counter()

        raw = self._redis.xread(
            {scan_creation_stream: since},
            block=timeout,
            count=1,
        )

        if timeout not in [None, 0]:
            stop = time.perf_counter()
            timeout -= int((stop - start) * 1000)
            if timeout <= 0:
                timeout = None

        if not raw:
            raise NoScanAvailable
        else:
            timetag, data = raw[0][1][0]
            timetag = timetag.decode()
            key = data[b"key"].decode()

        ttl = self._redis.ttl(key)
        if ttl != -1 and ttl <= 10:
            # scan was deleted, skip that one
            since = timetag
            continue

        return timetag, key

get_scans_state(scan_keys)

Return the current state of a list of scans from their keys in an efficient way. This is done by only requesting last state in the state stream of each scan, without syncing json content as Scan._load() would do.

Source code in blissdata/redis_engine/store.py
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
def get_scans_state(self, scan_keys: list[str]):
    """Return the current state of a list of scans from their keys in an efficient way.
    This is done by only requesting last state in the state stream of each scan, without
    syncing json content as Scan._load() would do.
    """
    if not scan_keys:
        return {}

    raw = self._redis.json().mget(scan_keys, "state_stream.pk")
    stream_keys = [self._stream_model.make_key(pk) for pk in raw]

    pipe = self._redis.pipeline(transaction=False)
    for stream_key in stream_keys:
        pipe.xrevrange(stream_key, count=1)
    raw = pipe.execute()
    scan_states = [ScanState(val[0][1][b"state"].decode()) for val in raw]

    return {key: state for key, state in zip(scan_keys, scan_states)}

search_existing_scans(**kwargs)

Search for scans matching with ScanModel.id fields matching with those provided in kwargs.

Example (assuming there is "name" and "dataset" fields in ScanModel.id): timetag, scan_keys = search_existing_scans(name="myscan", dataset="abc654")

A wildcard can be used for prefix/infix/suffix matches in each field, for example: name="abcd" OR name="efgh" OR name="ijkl" But "abcd*ijkl" is forbidden as it is matching on both prefix and suffix.

Return the following tuple

(, [, ...]) Where timetag correspond to the last scan creation event at the time of the search. It can be used by get_next_scan(since=timetag) to wait for any scan posterior to that search.

Source code in blissdata/redis_engine/store.py
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
def search_existing_scans(self, **kwargs):
    """Search for scans matching with ScanModel.id fields matching with those provided in kwargs.

    Example (assuming there is "name" and "dataset" fields in ScanModel.id):
        timetag, scan_keys = search_existing_scans(name="myscan", dataset="abc654")

    A wildcard can be used for prefix/infix/suffix matches in each field, for example:
        name="abcd*" OR name="*efgh*" OR name="*ijkl"
        But "abcd*ijkl" is forbidden as it is matching on both prefix and suffix.

    Return the following tuple:
        (<timetag>, [<scan key>, ...])
        Where timetag correspond to the last scan creation event at the time of the search.
        It can be used by get_next_scan(since=timetag) to wait for any scan posterior to that search.
    """
    # Escape all RedisSearch special characters except "*"
    escape_table = str.maketrans(
        {c: f"\\{c}" for c in ",.<>{}[]\"':;!@#$%^&()-+=~?/ "}
    )

    assert "since" not in kwargs
    if not kwargs:
        query_string = "*"
    else:
        query_items = {}
        for k, v in kwargs.items():
            if isinstance(v, Number):
                query_items[f"id_{k}"] = f"[{v} {v}]"
            else:
                query_items[f"id_{k}"] = tags(v.translate(escape_table))
        query_string = querystring(**query_items)

    max_count = 1000
    while True:
        query = Query(query_string).paging(0, max_count).no_content()
        timetag, raw_result = self._timestamped_search(query)
        count = raw_result[0]
        if count <= max_count:
            break
        else:
            # In the unlikely case more than max_count results are available,
            # increase max_count and retry.
            max_count = count

    return timetag, [key.decode() for key in raw_result[1:]]