嗨,我想分享一个游标分页模式(或游标分页模式)的示例,因为当我搜索一个时,我只能找到向前移动但不能向后移动的情况的示例,也无法找到如何处理开始和结束的数据。
您可以在此处查看此内容的存储库,但我将尝试在这里解释所有内容。
我使用 python poetry 作为包管理工具,因此对于这个示例,我假设您已经拥有它。首先要做的是使用诗歌安装来安装依赖项。您还可以使用 pip 来安装它们: pip install pymongo loguru。
现在我们还需要一个mongo数据库,你可以在这里下载mongodb社区版,并且可以按照本指南进行配置。
现在我们已经安装了依赖项和数据库,我们可以向其中添加数据。为此,我们可以使用这个:
from pymongo import mongoclient # data to add sample_posts = [ {"title": "post 1", "content": "content 1", "date": datetime(2023, 8, 1)}, {"title": "post 2", "content": "content 2", "date": datetime(2023, 8, 2)}, {"title": "post 3", "content": "content 3", "date": datetime(2023, 8, 3)}, {"title": "post 4", "content": "content 4", "date": datetime(2023, 8, 4)}, {"title": "post 5", "content": "content 5", "date": datetime(2023, 8, 5)}, {"title": "post 6", "content": "content 6", "date": datetime(2023, 8, 6)}, {"title": "post 7", "content": "content 7", "date": datetime(2023, 8, 7)}, {"title": "post 8", "content": "content 8", "date": datetime(2023, 8, 8)}, {"title": "post 9", "content": "content 9", "date": datetime(2023, 8, 9)}, {"title": "post 10", "content": "content 10", "date": datetime(2023, 8, 10)}, {"title": "post 11", "content": "content 11", "date": datetime(2023, 8, 11)}, ] # creating connection token = "mongodb://localhost:27017" client = mongoclient(token) cursor_db = client.cursor_db.content cursor_db.insert_many(sample_posts)
这样我们就可以创建到本地数据库到集合内容的连接。然后我们将 sample_posts 中的值添加到其中。现在我们有了要搜索的数据,我们可以开始查询它。让我们开始搜索并读取数据,直到结束。
# import libraries from bson.objectid import objectid from datetime import datetime from loguru import logger from pymongo import mongoclient # use token to connect to local database token = "mongodb://localhost:27017" client = mongoclient(token) # access cursor_db collection (it will be created if it does not exist) cursor_db = client.cursor_db.content default_page_size = 5 def fetch_next_page(cursor, page_size = none): # use the provided page_size or use a default value page_size = page_size or default_page_size # check if there is a cursor if cursor: # get documents with `_id` greater than the cursor query = {"_id": {'$gt': cursor}} else: # get everything query = {} # sort in ascending order by `_id` sort_order = 1 # define the aggregation pipeline pipeline = [ {"$match": query}, # filter based on the cursor {"$sort": {"_id": sort_order}}, # sort documents by `_id` {"$limit": page_size + 1}, # limit results to page_size + 1 to check if there's a next page # {"$project": {"_id": 1, "title": 1, "content": 1}} # in case you want to return only certain attributes ] # execute the aggregation pipeline results = list(cursor_db.aggregate(pipeline)) # logger.debug(results) # validate if some data was found if not results: raise valueerror("no data found") # check if there are more documents than the page size if len(results) > page_size: # deleting extra document results.pop(-1) # set the cursor for the next page next_cursor = results[-1]['_id'] # set the previous cursor if cursor: # in case the cursor have data prev_cursor = results[0]['_id'] else: # in case the cursor don't have data (first page) prev_cursor = none # indicate you haven't reached the end of the data at_end = false else: # indicate that there are not more pages available (last page reached) next_cursor = none # set the cursor for the previous page prev_cursor = results[0]['_id'] # indicate you have reached the end of the data at_end = true return results, next_cursor, prev_cursor, at_end @logger.catch def main(): """main function.""" # get the first page results, next_cursor, prev_cursor, at_end = fetch_next_page(none) logger.info(f"{results = }") logger.info(f"{next_cursor = }") logger.info(f"{prev_cursor = }") logger.info(f"{at_end = }") if __name__: main() logger.info("--- execution end ---")
该代码返回:
2024-09-02 08:55:24.388 | info | __main__:main:73 - results = [{'_id': objectid('66bdfdcf7a0667fd1888c20c'), 'title': 'post 1', 'content': 'content 1', 'date': datetime.datetime(2023, 8, 1, 0, 0)}, {'_id': objectid('66bdfdcf7a0667fd1888c20d'), 'title': 'post 2', 'content': 'content 2', 'date': datetime.datetime(2023, 8, 2, 0, 0)}, {'_id': objectid('66bdfdcf7a0667fd1888c20e'), 'title': 'post 3', 'content': 'content 3', 'date': datetime.datetime(2023, 8, 3, 0, 0)}, {'_id': objectid('66bdfdcf7a0667fd1888c20f'), 'title': 'post 4', 'content': 'content 4', 'date': datetime.datetime(2023, 8, 4, 0, 0)}, {'_id': objectid('66bdfdcf7a0667fd1888c210'), 'title': 'post 5', 'content': 'content 5', 'date': datetime.datetime(2023, 8, 5, 0, 0)}] 2024-09-02 08:55:24.388 | info | __main__:main:74 - next_cursor = objectid('66bdfdcf7a0667fd1888c210') 2024-09-02 08:55:24.388 | info | __main__:main:75 - prev_cursor = none 2024-09-02 08:55:24.388 | info | __main__:main:76 - at_end = false 2024-09-02 08:55:24.388 | info | __main__:<module>:79 - --- execution end ---
可以看到光标指向下一页,而上一页为none,也说明还没有到数据的末尾。为了获得这个值,我们必须更好地了解函数 fetch_next_page。在那里我们可以看到我们定义了 page_size、查询、sort_order,然后我们创建了聚合操作的管道。为了确定是否存在另一页信息,我们使用 $limit 运算符,我们给出 page_size + 1 的值来检查实际上是否存在具有该 + 1 的另一页。要实际检查它,我们使用表达式 len( results) > page_size,如果返回的数据数大于page_size则还有一个页面;相反,这是最后一页。
对于有下一页的情况,我们必须从我们查询的信息列表中删除最后一个元素,因为那是管道中的+1,我们需要使用当前最后一个值中的_id来设置next_cursor列表中,根据情况设置prev_cursor(前一个游标),如果有游标则说明在这之前有数据,否则说明这是第一组数据,所以有没有先前的信息,因此,光标应该是找到的数据中的第一个 _id 或 none。
现在我们知道如何搜索数据并添加一些重要的验证,我们必须启用一种向前遍历数据的方法,为此我们将使用输入命令请求运行脚本的用户写入移动方向,不过,现在它只会向前(f)。我们可以更新我们的 main 函数来做到这一点:
@logger.catch def main(): """main function.""" # get the first page results, next_cursor, prev_cursor, at_end = fetch_next_page(none) logger.info(f"{results = }") logger.info(f"{next_cursor = }") logger.info(f"{prev_cursor = }") logger.info(f"{at_end = }") # checking if there is more data to show if next_cursor: # enter a cycle to traverse the data while(true): print(125 * "*") # ask for the user to move forward or cancel the execution inn = input("can only move forward (f) or cancel (c): ") # execute action acording to the input if inn == "f": results, next_cursor, prev_cursor, at_end = fetch_next_page(next_cursor, default_page_size) elif inn == "c": logger.warning("------- canceling execution -------") break else: # in case the user sends something that is not a valid option print("not valid action, it can only move in the opposite direction.") continue logger.info(f"{results = }") logger.info(f"{next_cursor = }") logger.info(f"{prev_cursor = }") logger.info(f"{at_end = }") else: logger.warning("there is not more data to show")
这样我们就可以遍历数据直到结束,但是当到达结束时它会返回到开头并再次开始循环,因此我们必须添加一些验证来避免这种情况并向后移动。为此,我们将创建函数 fetch_previous_page 并对 main 函数添加一些更改:
def fetch_previous_page(cursor, page_size = none): # use the provided page_size or fallback to the class attribute page_size = page_size or default_page_size # check if there is a cursor if cursor: # get documents with `_id` less than the cursor query = {'_id': {'$lt': cursor}} else: # get everything query = {} # sort in descending order by `_id` sort_order = -1 # define the aggregation pipeline pipeline = [ {"$match": query}, # filter based on the cursor {"$sort": {"_id": sort_order}}, # sort documents by `_id` {"$limit": page_size + 1}, # limit results to page_size + 1 to check if there's a next page # {"$project": {"_id": 1, "title": 1, "content": 1}} # in case you want to return only certain attributes ] # execute the aggregation pipeline results = list(cursor_db.aggregate(pipeline)) # validate if some data was found if not results: raise valueerror("no data found") # check if there are more documents than the page size if len(results) > page_size: # deleting extra document results.pop(-1) # reverse the results to maintain the correct order results.reverse() # set the cursor for the previous page prev_cursor = results[0]['_id'] # set the cursor for the next page next_cursor = results[-1]['_id'] # indicate you are not at the start of the data at_start = false else: # reverse the results to maintain the correct order results.reverse() # indicate that there are not more previous pages available (initial page reached) prev_cursor = none # !!!! next_cursor = results[-1]['_id'] # indicate you have reached the start of the data at_start = true return results, next_cursor, prev_cursor, at_start
与 fetch_next_page 极其相似,但查询(如果满足条件)使用运算符 $lt 并且 sort_order 必须为 -1 才能按所需顺序获取数据。现在,在验证 if len(results) > page_size 时,如果条件为 true,则会删除多余的元素并反转数据的顺序以使其正确显示,然后将前一个光标设置为数据的第一个元素和下一个光标到最后一个元素。反之,数据相反,前一个光标设置为none(因为没有之前的数据),并将下一个光标设置为列表的最后一个值。在这两种情况下,都会定义一个名为 at_start 的布尔变量来识别这种情况。现在我们必须在主函数中添加与用户后退的交互,因此如果我们位于数据的开头、结尾或中间,则需要处理 3 种情况:仅前进、仅后退,以及前进或后退:
@logger.catch def main(): """main function.""" # get the first page results, next_cursor, prev_cursor, at_end = fetch_next_page(none) logger.info(f"{results = }") logger.info(f"{next_cursor = }") logger.info(f"{prev_cursor = }") logger.info(f"{at_end = }") # checking if there is more data to show if not(at_start and at_end): # enter a cycle to traverse the data while(true): print(125 * "*") # ask for the user to move forward or cancel the execution if at_end: inn = input("can only move backward (b) or cancel (c): ") stage = 0 elif at_start: inn = input("can only move forward (f) or cancel (c): ") stage = 1 else: inn = input("can move forward (f), backward (b), or cancel (c): ") stage = 2 # execute action acording to the input if inn == "f" and stage in [1, 2]: results, next_cursor, prev_cursor, at_end = fetch_next_page(next_cursor, page_size) # for this example, you must reset here the value, otherwise you lose the reference of the cursor at_start = false elif inn == "b" and stage in [0, 2]: results, next_cursor, prev_cursor, at_start = fetch_previous_page(prev_cursor, page_size) # for this example, you must reset here the value, otherwise you lose the reference of the cursor at_end = false elif inn == "c": logger.warning("------- canceling execution -------") break else: print("not valid action, it can only move in the opposite direction.") continue logger.info(f"{results = }") logger.info(f"{next_cursor = }") logger.info(f"{prev_cursor = }") logger.info(f"{at_start = }") logger.info(f"{at_end = }") else: logger.warning("there is not more data to show")
我们对用户输入添加了验证,以识别我们在遍历数据时所处的阶段,还要注意分别执行 fetch_next_page 和 fetch_previous_page 后的 at_start 和 at_end,在达到这些阶段后需要重置限制。现在您可以到达数据的末尾并向后移动直到开始。获取第一页数据后的验证已更新,以检查标志 at_start 和 at_end 是否为 true,这将表明没有更多数据可显示。
注意:我此时遇到了一个错误,我现在无法重现该错误,但它在向后移动并到达开头时导致了问题,光标指向错误的地方,当你想继续前进时,它会跳过 1 个元素。为了解决这个问题,我在 fetch_previous_page 中添加了一个验证,如果一个名为 prev_at_start 的参数(这是 at_start 的先前值)来分配 next_cursor 值 results[0]['_id'] 或 results[-1]['_id'] 中如果前一阶段不在数据的开头。从现在开始,这一点将被省略,但我认为值得一提。
现在我们可以从头到尾遍历数据并向前或向后遍历数据,我们可以创建一个具有所有这些功能的类并调用它来使用示例。此外,我们还必须添加文档字符串,以便所有内容都是正确的文档。结果如下代码所示:
"""Cursor Paging/Pagination Pattern Example.""" from bson.objectid import ObjectId from datetime import datetime from loguru import logger from pymongo import MongoClient class cursorPattern: """ A class to handle cursor-based pagination for MongoDB collections. Attributes: ----------- cursor_db : pymongo.collection.Collection The MongoDB collection used for pagination. page_size : int Size of the pages. """ def __init__(self, page_size: int = 5) -> None: """Initializes the class. Sets up a connection to MongoDB and specifying the collection to work with. """ token = "mongodb://localhost:27017" client = MongoClient(token) self.cursor_db = client.cursor_db.content self.page_size = page_size def add_data(self,) -> None: """Inserts sample data into the MongoDB collection for demonstration purposes. Note: ----- It should only use once, otherwise you will have repeated data. """ sample_posts = [ {"title": "Post 1", "content": "Content 1", "date": datetime(2023, 8, 1)}, {"title": "Post 2", "content": "Content 2", "date": datetime(2023, 8, 2)}, {"title": "Post 3", "content": "Content 3", "date": datetime(2023, 8, 3)}, {"title": "Post 4", "content": "Content 4", "date": datetime(2023, 8, 4)}, {"title": "Post 5", "content": "Content 5", "date": datetime(2023, 8, 5)}, {"title": "Post 6", "content": "Content 6", "date": datetime(2023, 8, 6)}, {"title": "Post 7", "content": "Content 7", "date": datetime(2023, 8, 7)}, {"title": "Post 8", "content": "Content 8", "date": datetime(2023, 8, 8)}, {"title": "Post 9", "content": "Content 9", "date": datetime(2023, 8, 9)}, {"title": "Post 10", "content": "Content 10", "date": datetime(2023, 8, 10)}, {"title": "Post 11", "content": "Content 11", "date": datetime(2023, 8, 11)}, ] self.cursor_db.insert_many(sample_posts) def _fetch_next_page( self, cursor: ObjectId | None, page_size: int | None = None ) -> tuple[list, ObjectId | None, ObjectId | None, bool]: """Retrieves the next page of data based on the provided cursor. Parameters: ----------- cursor : ObjectId | None The current cursor indicating the last document of the previous page. page_size : int | None The number of documents to retrieve per page (default is the class's page_size). Returns: -------- tuple: - results (list): The list of documents retrieved. - next_cursor (ObjectId | None): The cursor pointing to the start of the next page, None in case is the last page. - prev_cursor (ObjectId | None): The cursor pointing to the start of the previous page, None in case is the start page. - at_end (bool): Whether this is the last page of results. """ # Use the provided page_size or fallback to the class attribute page_size = page_size or self.page_size # Check if there is a cursor if cursor: # Get documents with `_id` greater than the cursor query = {"_id": {'$gt': cursor}} else: # Get everything query = {} # Sort in ascending order by `_id` sort_order = 1 # Define the aggregation pipeline pipeline = [ {"$match": query}, # Filter based on the cursor {"$sort": {"_id": sort_order}}, # Sort documents by `_id` {"$limit": page_size + 1}, # Limit results to page_size + 1 to check if there's a next page # {"$project": {"_id": 1, "title": 1, "content": 1}} # In case you want to return only certain attributes ] # Execute the aggregation pipeline results = list(self.cursor_db.aggregate(pipeline)) # logger.debug(results) # Validate if some data was found if not results: raise ValueError("No data found") # Check if there are more documents than the page size if len(results) > page_size: # Deleting extra document results.pop(-1) # Set the cursor for the next page next_cursor = results[-1]['_id'] # Set the previous cursor if cursor: # in case the cursor have data prev_cursor = results[0]['_id'] else: # In case the cursor don't have data (first time) prev_cursor = None # Indicate you haven't reached the end of the data at_end = False else: # Indicate that there are not more pages available (last page reached) next_cursor = None # Set the cursor for the previous page prev_cursor = results[0]['_id'] # Indicate you have reached the end of the data at_end = True return results, next_cursor, prev_cursor, at_end def _fetch_previous_page( self, cursor: ObjectId | None, page_size: int | None = None, ) -> tuple[list, ObjectId | None, ObjectId | None, bool]: """Retrieves the previous page of data based on the provided cursor. Parameters: ----------- cursor : ObjectId | None The current cursor indicating the first document of the current page. page_size : int The number of documents to retrieve per page. prev_at_start : bool Indicates whether the previous page was the first page. Returns: -------- tuple: - results (list): The list of documents retrieved. - next_cursor (ObjectId | None): The cursor pointing to the start of the next page, None in case is the last page. - prev_cursor (ObjectId | None): The cursor pointing to the start of the previous page, None in case is the start page. - at_start (bool): Whether this is the first page of results. """ # Use the provided page_size or fallback to the class attribute page_size = page_size or self.page_size # Check if there is a cursor if cursor: # Get documents with `_id` less than the cursor query = {'_id': {'$lt': cursor}} else: # Get everything query = {} # Sort in descending order by `_id` sort_order = -1 # Define the aggregation pipeline pipeline = [ {"$match": query}, # Filter based on the cursor {"$sort": {"_id": sort_order}}, # Sort documents by `_id` {"$limit": page_size + 1}, # Limit results to page_size + 1 to check if there's a next page # {"$project": {"_id": 1, "title": 1, "content": 1}} # In case you want to return only certain attributes ] # Execute the aggregation pipeline results = list(self.cursor_db.aggregate(pipeline)) # Validate if some data was found if not results: raise ValueError("No data found") # Check if there are more documents than the page size if len(results) > page_size: # Deleting extra document results.pop(-1) # Reverse the results to maintain the correct order results.reverse() # Set the cursor for the previous page prev_cursor = results[0]['_id'] # Set the cursor for the next page next_cursor = results[-1]['_id'] # Indicate you are not at the start of the data at_start = False else: # Reverse the results to maintain the correct order results.reverse() # Indicate that there are not more previous pages available (initial page reached) prev_cursor = None # if prev_at_start: # # in case before was at the starting page # logger.warning("Caso 1") # next_cursor = results[0]['_id'] # else: # # in case before was not at the starting page # logger.warning("Caso 2") # next_cursor = results[-1]['_id'] next_cursor = results[-1]['_id'] # Indicate you have reached the start of the data at_start = True return results, next_cursor, prev_cursor, at_start def start_pagination(self): """Inicia la navegacion de datos.""" # Change page size in case you want it, only leave it here for reference page_size = None # Retrieve the first page of results results, next_cursor, prev_cursor, at_end = self._fetch_next_page(None, page_size) at_start = True logger.info(f"{results = }") logger.info(f"{next_cursor = }") logger.info(f"{prev_cursor = }") logger.info(f"{at_start = }") logger.info(f"{at_end = }") # if next_cursor: if not(at_start and at_end): while(True): print(125 * "*") if at_end: inn = input("Can only move Backward (b) or Cancel (c): ") stage = 0 # ===================================================== # You could reset at_end here, but in this example that # will fail in case the user sends something different # from Backward (b) or Cancel (c) # ===================================================== # at_end = False elif at_start: inn = input("Can only move Forward (f) or Cancel (c): ") stage = 1 # ===================================================== # You could reset at_end here, but in this example that # will fail in case the user sends something different # from Forward (f) or Cancel (c) # ===================================================== # at_start = False else: inn = input("Can move Forward (f), Backward (b), or Cancel (c): ") stage = 2 # Execute action acording to the input if inn == "f" and stage in [1, 2]: results, next_cursor, prev_cursor, at_end = self._fetch_next_page(next_cursor, page_size) # For this example, you must reset here the value, otherwise you lose the reference of the cursor at_start = False elif inn == "b" and stage in [0, 2]: # results, next_cursor, prev_cursor, at_start = self._fetch_previous_page(prev_cursor, at_start, page_size) results, next_cursor, prev_cursor, at_start = self._fetch_previous_page(prev_cursor, page_size) # For this example, you must reset here the value, otherwise you lose the reference of the cursor at_end = False elif inn == "c": logger.warning("------- Canceling execution -------") break else: print("Not valid action, it can only move in the opposite direction.") continue logger.info(f"{results = }") logger.info(f"{next_cursor = }") logger.info(f"{prev_cursor = }") logger.info(f"{at_start = }") logger.info(f"{at_end = }") else: logger.warning("There is not more data to show") @logger.catch def main(): """Main function.""" my_cursor = cursorPattern(page_size=5) # my_cursor.add_data() my_cursor.start_pagination() if __name__: main() logger.info("--- Execution end ---")
page_size 作为属性添加到类cursorpattern 中,以便更轻松地定义每个页面的大小,并向该类及其方法添加文档字符串。
希望这能帮助/指导需要实现光标分页的人。