diff --git a/cassandra/cluster.py b/cassandra/cluster.py index d5f80290a9..ee296005ac 100644 --- a/cassandra/cluster.py +++ b/cassandra/cluster.py @@ -37,6 +37,9 @@ import time from threading import Lock, RLock, Thread, Event import uuid +import os +import urllib.request, urllib.error +import json import weakref from weakref import WeakValueDictionary @@ -81,6 +84,7 @@ from cassandra.marshal import int64_pack from cassandra.timestamps import MonotonicTimestampGenerator from cassandra.util import _resolve_contact_points_to_string_map, Version +from cassandra.concurrent import execute_concurrent, execute_concurrent_with_args from cassandra.datastax.insights.reporter import MonitorReporter from cassandra.datastax.insights.util import version_supports_insights @@ -2725,6 +2729,79 @@ def execute_async(self, query, parameters=None, trace=False, custom_payload=None future.send_request() return future + def execute_concurrent(self, statements_and_parameters, concurrency=100, raise_on_first_error=True, results_generator=False, execution_profile=EXEC_PROFILE_DEFAULT): + """ + Executes a sequence of (statement, parameters) tuples concurrently. Each + ``parameters`` item must be a sequence or :const:`None`. + + The `concurrency` parameter controls how many statements will be executed + concurrently. When :attr:`.Cluster.protocol_version` is set to 1 or 2, + it is recommended that this be kept below 100 times the number of + core connections per host times the number of connected hosts (see + :meth:`.Cluster.set_core_connections_per_host`). If that amount is exceeded, + the event loop thread may attempt to block on new connection creation, + substantially impacting throughput. If :attr:`~.Cluster.protocol_version` + is 3 or higher, you can safely experiment with higher levels of concurrency. + + If `raise_on_first_error` is left as :const:`True`, execution will stop + after the first failed statement and the corresponding exception will be + raised. + + `results_generator` controls how the results are returned. + + * If :const:`False`, the results are returned only after all requests have completed. + * If :const:`True`, a generator expression is returned. Using a generator results in a constrained + memory footprint when the results set will be large -- results are yielded + as they return instead of materializing the entire list at once. The trade for lower memory + footprint is marginal CPU overhead (more thread coordination and sorting out-of-order results + on-the-fly). + + `execution_profile` argument is the execution profile to use for this + request, it is passed directly to :meth:`Session.execute_async`. + + A sequence of ``ExecutionResult(success, result_or_exc)`` namedtuples is returned + in the same order that the statements were passed in. If ``success`` is :const:`False`, + there was an error executing the statement, and ``result_or_exc`` + will be an :class:`Exception`. If ``success`` is :const:`True`, ``result_or_exc`` + will be the query result. + + Example usage:: + + select_statement = session.prepare("SELECT * FROM users WHERE id=?") + + statements_and_params = [] + for user_id in user_ids: + params = (user_id, ) + statements_and_params.append((select_statement, params)) + + results = session.execute_concurrent(statements_and_params, raise_on_first_error=False) + + for (success, result) in results: + if not success: + handle_error(result) # result will be an Exception + else: + process_user(result[0]) # result will be a list of rows + + Note: in the case that `generators` are used, it is important to ensure the consumers do not + block or attempt further synchronous requests, because no further IO will be processed until + the consumer returns. This may also produce a deadlock in the IO event thread. + """ + return execute_concurrent(self, statements_and_parameters, concurrency, raise_on_first_error, results_generator, execution_profile) + + def execute_concurrent_with_args(self, statement, parameters, *args, **kwargs): + """ + Like :meth:`~cassandra.concurrent.execute_concurrent()`, but takes a single + statement and a sequence of parameters. Each item in ``parameters`` + should be a sequence or :const:`None`. + + Example usage:: + + statement = session.prepare("INSERT INTO mytable (a, b) VALUES (1, ?)") + parameters = [(x,) for x in range(1000)] + session.execute_concurrent_with_args(statement, parameters, concurrency=50) + """ + return execute_concurrent_with_args(self, statement, parameters, *args, **kwargs) + def execute_graph(self, query, parameters=None, trace=False, execution_profile=EXEC_PROFILE_GRAPH_DEFAULT, execute_as=None): """ Executes a Gremlin query string or GraphStatement synchronously, diff --git a/cassandra/concurrent.py b/cassandra/concurrent.py index fb8f26e1cc..b317826de1 100644 --- a/cassandra/concurrent.py +++ b/cassandra/concurrent.py @@ -29,61 +29,7 @@ def execute_concurrent(session, statements_and_parameters, concurrency=100, raise_on_first_error=True, results_generator=False, execution_profile=EXEC_PROFILE_DEFAULT): """ - Executes a sequence of (statement, parameters) tuples concurrently. Each - ``parameters`` item must be a sequence or :const:`None`. - - The `concurrency` parameter controls how many statements will be executed - concurrently. When :attr:`.Cluster.protocol_version` is set to 1 or 2, - it is recommended that this be kept below 100 times the number of - core connections per host times the number of connected hosts (see - :meth:`.Cluster.set_core_connections_per_host`). If that amount is exceeded, - the event loop thread may attempt to block on new connection creation, - substantially impacting throughput. If :attr:`~.Cluster.protocol_version` - is 3 or higher, you can safely experiment with higher levels of concurrency. - - If `raise_on_first_error` is left as :const:`True`, execution will stop - after the first failed statement and the corresponding exception will be - raised. - - `results_generator` controls how the results are returned. - - * If :const:`False`, the results are returned only after all requests have completed. - * If :const:`True`, a generator expression is returned. Using a generator results in a constrained - memory footprint when the results set will be large -- results are yielded - as they return instead of materializing the entire list at once. The trade for lower memory - footprint is marginal CPU overhead (more thread coordination and sorting out-of-order results - on-the-fly). - - `execution_profile` argument is the execution profile to use for this - request, it is passed directly to :meth:`Session.execute_async`. - - A sequence of ``ExecutionResult(success, result_or_exc)`` namedtuples is returned - in the same order that the statements were passed in. If ``success`` is :const:`False`, - there was an error executing the statement, and ``result_or_exc`` will be - an :class:`Exception`. If ``success`` is :const:`True`, ``result_or_exc`` - will be the query result. - - Example usage:: - - select_statement = session.prepare("SELECT * FROM users WHERE id=?") - - statements_and_params = [] - for user_id in user_ids: - params = (user_id, ) - statements_and_params.append((select_statement, params)) - - results = execute_concurrent( - session, statements_and_params, raise_on_first_error=False) - - for (success, result) in results: - if not success: - handle_error(result) # result will be an Exception - else: - process_user(result[0]) # result will be a list of rows - - Note: in the case that `generators` are used, it is important to ensure the consumers do not - block or attempt further synchronous requests, because no further IO will be processed until - the consumer returns. This may also produce a deadlock in the IO event thread. + See :meth:`.Session.execute_concurrent`. """ if concurrency <= 0: raise ValueError("concurrency must be greater than 0") @@ -216,14 +162,6 @@ def _results(self): def execute_concurrent_with_args(session, statement, parameters, *args, **kwargs): """ - Like :meth:`~cassandra.concurrent.execute_concurrent()`, but takes a single - statement and a sequence of parameters. Each item in ``parameters`` - should be a sequence or :const:`None`. - - Example usage:: - - statement = session.prepare("INSERT INTO mytable (a, b) VALUES (1, ?)") - parameters = [(x,) for x in range(1000)] - execute_concurrent_with_args(session, statement, parameters, concurrency=50) + See :meth:`.Session.execute_concurrent_with_args`. """ return execute_concurrent(session, zip(cycle((statement,)), parameters), *args, **kwargs)