Friday, 9 February 2018

Apache Cassandra Materialized View

Apache Cassandra Materialized View:


Materialized views are a feature, first released in Cassandra 3.0, which provide automatic maintenance of a shadow table (the materialized view) to a base table with a different partition key thus allowing efficient select for data with different keys.
Many Cassandra users will be aware that the Apache Cassandra project recently made the decision to mark materialized views as experimental beginning from Cassandra 3.0.16 and 3.11.2 (for further details seehttps://mail-archives.apache.org/mod_mbox/cassandra-user/201710.mbox/%3CetPan.59f24f38.438f4e99.74dc%40apple.com%3Eand https://issues.apache.org/jira/browse/CASSANDRA-13959). As this move may cause concern to users who are already using materialized views, this post provides our recommendations for those users and clarifies our position on materialized views for Instaclustr managed service and support customers.
Materialized views have been around for some time and, in our observation, are reasonably widely deployed in recently developed Cassandra applications.  Quite a number of issues have been found through these initial deployments, many of which have been fixed in recent releases of Apache Cassandra. However, these deployments have also highlighted some fundamental issues with materialized views which were highlighted in the decision to move them to experimental status:
  • There is no in-built method for reconciling the materialized view with the base table (which should not matter if everything functions as expected but, in a complex distributed system, would be a valuable safety net).
  • If you do find differences between the materialized view and base table, there is no in-built method for re-synchronizing the view with the base table other than dropping the materialized view and recreating.
  • There are no strong guarantees on the time for updates to the base table to be reflected in materialized views (which is inherited from the logged batch mechanism that materialized views are build on).
Users with a need to retain copies of their data with an alternate partition key structure are therefore left with basically two choices:
  1. adopt MVs with these known limitations and develop their own work-arounds (i.e. reconciliation processes) or accept the associated risks; or
  2. fall back to using application code to maintain multiple views of the data (which will likely still require the development of reconciliation tools).
The move of materialized view to an experimental state does highlight the risk (that exists with any software) that there are other, currently unknown issues. However, in recent versions many of the known issues have been fixed, and with some care materialized views are being used successfully without major issues. The section “Recent Fixes and Specific Considerations” below sets out these fixes, some remaining known edge cases and also considerations around repairs.
Instaclustr’s position on support of materialized view for our managed service and support customers is as follows:
  1. We will support materialized views within the known functional limitations set out in this post.
  2. We recommend that you explicitly test the correctness of materialized views for your application scenarios, including under load (do not assume correctness).
  3. We recommend that you develop reconciliation checking tools to check the correctness of your materialized views against your base tables and run these regularly in production. (Any identified issues can likely be manually fixed by upserting to the base table, tools may be developed for this if required.)
  4. Avoid using incremental repairs with materialized views.
  5. Ensure you follow Cassandra data modelling best practice and consider partition sizes for both the base table and materialized view.
  6. Do not create a materialized views with filtering on a non-primary key column (now disabled by default).
We appreciate that it is undesirable for functions to be released like this when they are not production ready. We have been heartened to see the Cassandra project move to a higher bar for quality and a greater focus on stability in recent times and see this clarification of the status of materialized views as a positive move in that regard.
In addition to the Cassandra project’s moves, Instaclustr has commenced steps to develop a certification process for versions of Cassandra that we support which will provide a documented level of testing and results in addition to the project’s testing as well as a guidance on the maturity and level of support for versions and new features. We expect to release this process in Q1 2018.
Should you have any questions regarding this material please contactinfo@instaclustr.com.

Recent Fixes and Specific Considerations

Recent Fixes

In 3.11.1 a number of cases were fixed that resulted in inconsistent data between the base and the materialized view. These consisted of issues relating to TTL’s, the use of TIMESTAMP, using an additional non-primary key column in the primary key of the materialized view, deletions, and filtering on non-partition key columns in the view. Following is a list of issues fixed, note that most of these were fixed together in CASSANDRA-11500.
  • Range tombstones created prior to the data they shadow will not delete the data in the materialized view – CASSANDRA-13787
  • DELETE of unselected column/collection should not affect ordered updates – CASSANDRA-13127
  • Unselected columns should keep the materialized view row alive when other columns expire – CASSANDRA-13127
Specifically affecting materialized views with an extra non-PK column in the view PK
  • View row should expire when view PK column expires in base – CASSANDRA-13657
  • Commutative row deletion – CASSANDRA-13409
  • Out of order updates to extra column on view PK – CASSANDRA-11500

Edge Case Issues

There were also consistency issues related to filtering in the materialized view against non-primary key columns (e.g: CREATE MATERIALIZED VIEW AS SELECT * WHERE enabled = True) that could result in inconsistent data between base and the materialized view. This case was unable to be fixed without a large storage re-write which cannot happen until 4.0, so has been blocked by default in 3.11.1. There is a JVM parameter you can pass in to re-enable this functionality, however you should understand potential implications of using materialized views in this way (-Dcassandra.mv.allow_filtering_nonkey_columns_unsafe). More information can be found in CASSANDRA-13798 and CASSANDRA-13547.
We recommend against creating a materialized view with filtering on a non-primary key column. If you have already started with this use case or absolutely need to do it, you should continue only if you intend to stick to a write-once pattern for the base table. Updating non-primary key columns with a filter on a non-PK base column will inevitably lead to inconsistent data between materialized view and base. The typical scenario is that after multiple updates to the filtered column the materialized view row will disappear.  The following example provides a better idea of the problem. The simplest way to avoid this problem is with a write-once pattern to the base table, with no updates or manual deletions.
 (0 rows)
The view row is now dead but should be alive.
Another specific case to be aware of is the deletion of columns not selected in the materialized view. This scenario may result in cases where the deletion is not properly reflected in the view. At the moment the only proven case of this is when deletions pre-3.11.1 are propagated after upgrading to 3.11.1 using repairs or hints. This is low risk but still a possibility, and in which case we recommend avoiding deletions on columns not included in the select clause of the view. For example, the following queries should be avoided in the given base table below:

Other Considerations

Other existing issues exist that mostly revolve around poor data models that result in very large partitions. The batchlog and write path are currently incapable of handling views with very large partitions. Partition deletions that will affect a large number of view primary keys will generate a single mutation (write) which may exceed limits such as max_mutation_size (default 16MB) or the max_value_size (default 256MB). If you hit one of these errors you may not effectively delete the relevant rows in the view. The easiest way to avoid this issue is to avoid poor view data models that would result in very large partitions or wide rows.
You should also be aware of some issues with repairs. Firstly you should avoid incremental repairs against MV’s, and stick to full repairs only (CASSANDRA-12888). Secondly, to avoid inconsistencies created in the view you should ensure you repair the base table first, and then follow up by repairing the view, as certain combinations of inconsistencies across the nodes could result in a repair bringing back data in the view (CASSANDRA-13073).
As always, we recommend testing your views in the same way you would test a normal table. Ensure you’ve tested and verified all your operations before using in production. Be sure to test repair as well and ensure your repairing strategy will work with materialized views.  
====================================================================

What are Materialized Views?

One of the default Cassandra strategies to deal with more sophisticated queries is to create CQL tables that contain the data in a structure that matches the query itself (denormalization). Cassandra 3.0 introduces a new CQL feature, Materialized Views which captures this concept as a first-class construct.
We decided to take a closer look.
Materialized Views are essentially standard CQL tables that are maintained automatically by the Cassandra server – as opposed to needing to manually write to many denormalized tables containing the same data, like in previous releases of Cassandra. At glance, this looks like a great feature: automating a process that was previously done by hand, and the server taking the responsibility for maintaining the various data structures.

How to use them?

For example, let’s suppose that we want to capture payment transaction information for a set of users. You can have the following structure as your base table which you would write the transactions to:
CREATE TABLE cc_transactions (
    userid text,
    year int,
    month int,
    day int,
    id int,
    amount int,
    card text,
    status text,
    PRIMARY KEY ((userid, year), month, day, id)
);
This table can be used to record transactions of users for each year, and is suitable for querying the transaction log of each of our users.
Let’s suppose there is a requirement for an administrative function allowing to see all the transactions for a given day.
CQL has been extended by the CREATE MATERIALIZED VIEW command, which can be used in the following manner:
CREATE MATERIALIZED VIEW transactions_by_day AS
    SELECT year, month, day, userid, id, amount, card, status
    FROM mvdemo.cc_transactions
    WHERE userid IS NOT NULL AND year IS NOT NULL AND month IS NOT NULL AND day IS NOT NULL AND id IS NOT NULL AND card IS NOT NULL
    PRIMARY KEY ((year, month, day), userid, id);
Let’s insert some data:
insert into cc_transactions (userid, year, month, day, id, card, amount, status) values ('John', 2017, 2, 6, 1, '1111-1111-1111-1111', -10, 'COMPLETED');
insert into cc_transactions (userid, year, month, day, id, card, amount, status) values ('John', 2017, 2, 6, 2, '1111-1111-1111-1111', 20, 'PENDING');
insert into cc_transactions (userid, year, month, day, id, card, amount, status) values ('Bob', 2017, 2, 6, 3, '2222-2222-2222-2222', -17, 'COMPLETED');
insert into cc_transactions (userid, year, month, day, id, card, amount, status) values ('Bob', 2017, 2, 7, 4, '2222-2222-2222-2222', -32, 'COMPLETED');
As you would expect, you can then execute the following queries:
select * from cc_transactions where userid = 'John' and year = 2017;

 userid | year | month | day | id | amount | card                | status
--------+------+-------+-----+----+--------+---------------------+-----------
   John | 2017 |     2 |   6 |  1 |    -10 | 1111-1111-1111-1111 | COMPLETED
   John | 2017 |     2 |   6 |  2 |     20 | 1111-1111-1111-1111 |   PENDING
And:
select * from transactions_by_day where year = 2017 and month = 2 and day = 6;

 year | month | day | userid | id | amount | card                | status
------+-------+-----+--------+----+--------+---------------------+-----------
 2017 |     2 |   6 |    Bob |  3 |    -17 | 2222-2222-2222-2222 | COMPLETED
 2017 |     2 |   6 |   John |  1 |    -10 | 1111-1111-1111-1111 | COMPLETED
 2017 |     2 |   6 |   John |  2 |     20 | 1111-1111-1111-1111 |   PENDING

Behind the scenes

The Materialized View is not a fundamentally special construct. Behind the scene, Cassandra will create “standard” table, and any mutation / access will go through the usual write and read paths.
If we look into the data directory for this keyspace, we should expect to find two separate subdirectories, containing SSTables for the base table and the Materialized View:
$ ls -la
total 16
drwxrwxr-x  4 davibo davibo 4096 Feb  9 10:32 .
drwxrwxr-x 10 davibo davibo 4096 Feb  8 12:11 ..
drwxrwxr-x  4 davibo davibo 4096 Feb  9 10:34 cc_transactions-14b32420eeb311e6b4a3754b64ff1113
drwxrwxr-x  3 davibo davibo 4096 Feb  9 10:34 transactions_by_day-1f36a390eeb311e6b4a3754b64ff1113
Let’s investigate the declaration of the Materialized View in a bit more detail:
CREATE MATERIALIZED VIEW transactions_by_day AS
    SELECT year, month, day, userid, id, amount, card, status
    FROM cc_transactions
    WHERE userid IS NOT NULL AND year IS NOT NULL AND month IS NOT NULL AND day IS NOT NULL AND id IS NOT NULL AND card IS NOT NULL
    PRIMARY KEY ((year, month, day), userid, id);
Note the PRIMARY KEY clause at the end of this statement. This is much what you would expect from Cassandra data modeling: defining the partition key and clustering columns for the Materialized View’s backing table. As such it should always be chosen carefully and the usual best practices apply to it:
  • Avoid unbounded partitions
  • Avoid too large partitions
  • Choose your partition key in a way that distributes the data correctly, avoiding cluster hotspots (the partition key chosen above is not a good one as it leads to temporal hotspots)
Also note the NOT NULL restrictions on all the columns declared as primary key. This is to ensure that no records in the Materialized View can exist with an incomplete primary key. This is currently a strict requirement when creating Materialized Views and trying to omit these checks will result in an error: Primary key column 'year' is required to be filtered by 'IS NOT NULL'

Functional limitations

In the current versions of Cassandra there are a number of limitations on the definition of Materialized Views.

A primary key of a Materialized View must contain all columns from the primary key of the base table

Any materialized view must map one CQL row from the base table to precisely one other row in the materialized view. This in practice means that all columns of the original primary key (partition key and clustering columns)must be represented in the materialized view, however they can appear in any order, and can define different partitioning compared to the base table.
Accustomed to relational database systems, this may feel like an odd restriction. It actually makes sense if you consider how Cassandra manages the data in the Materialized View. Since the View is nothing more under the hood than another Cassandra table, and is being updated via the usual mechanisms, when the base table is updated; an appropriate mutation is automatically generated and applied to the View.
In case a single CQL row in the Materialized View would be a result of potentially collapsing multiple base table rows, Cassandra would have no way of tracking the changes from all these base rows and appropriately represent them in the Materialized View (this is especially problematic on deletions of base rows).
As a result you are not allowed to define a Materialized View like this:
CREATE MATERIALIZED VIEW transactions_by_card AS
    SELECT userid, card, year, month, day, id, amount, status
    FROM cc_transactions
    WHERE year IS NOT NULL AND id IS NOT NULL AND card IS NOT NULL
    PRIMARY KEY ((card, year), id);
This attempt will result in the following error: Cannot create Materialized View transactions_by_card without primary key columns from base cc_transactions (day,month,userid)
This may be somewhat surprising – the ID column is a unique transaction identifier after all. However this is additional knowledge that is due to the semantics of the data model, and Cassandra has no way of understanding (or verifying and enforcing) that it is actually true or not. As a developer you have additional knowledge of the databeing manipulated than what is possible to declare in the CQL models.

A primary key of a Materialized View can contain at most one other column

As established already, the full base primary key must be part of the primary key of the Materialized View. It is possible to add another column from the original base table that was not part of the original primary key, but this is restricted in only a single additional column.
Again, this restriction feels rather odd. In this case the explanation is much more subtle: in certain concurrent update cases when both columns of the base table are manipulated at the same time; it is technically difficult to implement a solution on Cassandra’s side that guarantees no data (or deletions) are lost and the Materialized Views are consistent with the base table.
This restriction may be lifted in later releases, once the following tickets are resolved:
https://issues.apache.org/jira/browse/CASSANDRA-9928
https://issues.apache.org/jira/browse/CASSANDRA-10226

Advanced WHERE filtering criteria on columns that are not part of the base table’s primary key are only supported in Cassandra 3.10

Let’s suppose you want to create a View for “suspicious” transactions – those have too large of an amount associated with them. A possible way of implementing this is via a Materialized View with a more complex filter criteria:
CREATE MATERIALIZED VIEW suspicious_transactions AS
    SELECT userid, year, month, day, id, amount, card, status
    FROM cc_transactions
    WHERE userid IS NOT NULL AND year IS NOT NULL AND month IS NOT NULL AND day IS NOT NULL AND id IS NOT NULL 
        AND amount > 1000
    PRIMARY KEY ((userid, year), month, day, id);
This works on Cassandra 3.10 (the latest release at the time of writing this blog), and produces the results you would expect:
After executing:
insert into cc_transactions (userid, year, month, day, id, card, amount, status) values ('Bob', 2017, 2, 7, 5, '2222-2222-2222-2222', 1200, 'COMPLETED');
When we query:
> select * from cc_transactions where userid = 'Bob' and year = 2017;

 userid | year | month | day | id | amount | card                | status
--------+------+-------+-----+----+--------+---------------------+-----------
    Bob | 2017 |     2 |   6 |  3 |    -17 | 2222-2222-2222-2222 | COMPLETED
    Bob | 2017 |     2 |   7 |  4 |    -32 | 2222-2222-2222-2222 | COMPLETED
    Bob | 2017 |     2 |   7 |  5 |   1200 | 2222-2222-2222-2222 | COMPLETED

> select * from suspicious_transactions where userid = 'Bob' and year = 2017;

 userid | year | month | day | id | amount | card                | status
--------+------+-------+-----+----+--------+---------------------+-----------
    Bob | 2017 |     2 |   7 |  5 |   1200 | 2222-2222-2222-2222 | COMPLETED
However on Cassandra 3.9 we get the error: Non-primary key columns cannot be restricted in the SELECT statement used for materialized view creation (got restrictions on: amount)

Performance considerations

Maintaining the consistency between the base table and the associated Materialized Views comes with a cost. Since a Materialized View is effectively a Cassandra table, there is the obvious cost of writing to these tables. There is more to it though. Writing to any base table that has associated Materialized Views will result in the following:
  1. Locking of the entire partition
  2. Reading the current partition contents
  3. Calculating all the view mutations
  4. Creating a batch of the base mutation + the view mutations
  5. Executing all the changes
The first two steps are to ensure that a consistent state of the data is persisted across all Materialized Views – no two updates on the based table are allowed to interleave, therefore we are certain to read a consistent state of the full row and generate any Materialized View updates based on it.
Creating a batch of the mutations is for atomicity – using Cassandra’s batching capabilities ensures that if the base table mutation is successful, all the views will eventually represent the correct state. In practice this adds a significant overhead to write operations. Especially considering a read operation is executed before the write this transforms the expected characteristics quite dramatically (writes in Cassandra normally don’t require random disk I/O but in this case they will).
A tracing session with on a standard write with Consistency Level ONE would look like this:
activity                                                                                    | timestamp                  | source    | source_elapsed | client
--------------------------------------------------------------------------------------------+----------------------------+-----------+----------------+-----------
                                                                         Execute CQL3 query | 2017-02-09 16:55:30.467000 | 127.0.0.1 |              0 | 127.0.0.1
      Parsing insert into cc_transactions (...) values (...); [Native-Transport-Requests-1] | 2017-02-09 16:55:30.467000 | 127.0.0.1 |            234 | 127.0.0.1
                                          Preparing statement [Native-Transport-Requests-1] | 2017-02-09 16:55:30.467000 | 127.0.0.1 |            460 | 127.0.0.1
                            Determining replicas for mutation [Native-Transport-Requests-1] | 2017-02-09 16:55:30.468000 | 127.0.0.1 |            945 | 127.0.0.1
           MUTATION message received from /127.0.0.1 [MessagingService-Incoming-/127.0.0.1] | 2017-02-09 16:55:30.468000 | 127.0.0.3 |             47 | 127.0.0.1
                                                   Appending to commitlog [MutationStage-2] | 2017-02-09 16:55:30.468000 | 127.0.0.1 |           1154 | 127.0.0.1
                                       Adding to cc_transactions memtable [MutationStage-2] | 2017-02-09 16:55:30.468000 | 127.0.0.1 |           1319 | 127.0.0.1
        Sending MUTATION message to /127.0.0.3 [MessagingService-Outgoing-/127.0.0.3-Small] | 2017-02-09 16:55:30.468000 | 127.0.0.1 |           1359 | 127.0.0.1
        Sending MUTATION message to /127.0.0.2 [MessagingService-Outgoing-/127.0.0.2-Small] | 2017-02-09 16:55:30.468000 | 127.0.0.1 |           1446 | 127.0.0.1
                                                   Appending to commitlog [MutationStage-2] | 2017-02-09 16:55:30.469000 | 127.0.0.3 |            474 | 127.0.0.1
           MUTATION message received from /127.0.0.1 [MessagingService-Incoming-/127.0.0.1] | 2017-02-09 16:55:30.469000 | 127.0.0.2 |             26 | 127.0.0.1
                                       Adding to cc_transactions memtable [MutationStage-2] | 2017-02-09 16:55:30.469000 | 127.0.0.3 |            643 | 127.0.0.1
                                         Enqueuing response to /127.0.0.1 [MutationStage-2] | 2017-02-09 16:55:30.469000 | 127.0.0.3 |            819 | 127.0.0.1
   REQUEST_RESPONSE message received from /127.0.0.3 [MessagingService-Incoming-/127.0.0.1] | 2017-02-09 16:55:30.470000 | 127.0.0.1 |             27 | 127.0.0.1
Sending REQUEST_RESPONSE message to /127.0.0.1 [MessagingService-Outgoing-/127.0.0.1-Small] | 2017-02-09 16:55:30.470000 | 127.0.0.3 |           1381 | 127.0.0.1
                                                   Appending to commitlog [MutationStage-1] | 2017-02-09 16:55:30.470000 | 127.0.0.2 |           1065 | 127.0.0.1
                                       Adding to cc_transactions memtable [MutationStage-1] | 2017-02-09 16:55:30.470000 | 127.0.0.2 |           1431 | 127.0.0.1
                                         Enqueuing response to /127.0.0.1 [MutationStage-1] | 2017-02-09 16:55:30.470000 | 127.0.0.2 |           1723 | 127.0.0.1
Sending REQUEST_RESPONSE message to /127.0.0.1 [MessagingService-Outgoing-/127.0.0.1-Small] | 2017-02-09 16:55:30.470001 | 127.0.0.2 |           1983 | 127.0.0.1
                               Processing response from /127.0.0.3 [RequestResponseStage-2] | 2017-02-09 16:55:30.471000 | 127.0.0.1 |            531 | 127.0.0.1
   REQUEST_RESPONSE message received from /127.0.0.2 [MessagingService-Incoming-/127.0.0.1] | 2017-02-09 16:55:30.471000 | 127.0.0.1 |             24 | 127.0.0.1
                               Processing response from /127.0.0.2 [RequestResponseStage-1] | 2017-02-09 16:55:30.472000 | 127.0.0.1 |            225 | 127.0.0.1
                                                                           Request complete | 2017-02-09 16:55:30.468692 | 127.0.0.1 |           1692 | 127.0.0.1
Executing the same insert with one Materialized View on the table results in the following trace:
 activity                                                                                    | timestamp                  | source    | source_elapsed | client
--------------------------------------------------------------------------------------------+----------------------------+-----------+----------------+-----------
                                                                         Execute CQL3 query | 2017-02-09 17:03:15.651000 | 127.0.0.1 |              0 | 127.0.0.1
      Parsing insert into cc_transactions (...) values (...); [Native-Transport-Requests-1] | 2017-02-09 17:03:15.651000 | 127.0.0.1 |            183 | 127.0.0.1
                                          Preparing statement [Native-Transport-Requests-1] | 2017-02-09 17:03:15.652000 | 127.0.0.1 |            416 | 127.0.0.1
                            Determining replicas for mutation [Native-Transport-Requests-1] | 2017-02-09 17:03:15.652000 | 127.0.0.1 |            835 | 127.0.0.1
                                       Appending to commitlog [Native-Transport-Requests-1] | 2017-02-09 17:03:15.652000 | 127.0.0.1 |           1047 | 127.0.0.1
 Creating materialized view mutations from base table replica [Native-Transport-Requests-1] | 2017-02-09 17:03:15.652000 | 127.0.0.1 |           1139 | 127.0.0.1
          Executing single-partition query on cc_transactions [Native-Transport-Requests-1] | 2017-02-09 17:03:15.652000 | 127.0.0.1 |           1231 | 127.0.0.1
                                 Acquiring sstable references [Native-Transport-Requests-1] | 2017-02-09 17:03:15.653000 | 127.0.0.1 |           1303 | 127.0.0.1
                                    Merging memtable contents [Native-Transport-Requests-1] | 2017-02-09 17:03:15.653000 | 127.0.0.1 |           1346 | 127.0.0.1
                            Read 1 live and 0 tombstone cells [Native-Transport-Requests-1] | 2017-02-09 17:03:15.653000 | 127.0.0.1 |           1789 | 127.0.0.1
                            Determining replicas for mutation [Native-Transport-Requests-1] | 2017-02-09 17:03:15.653000 | 127.0.0.1 |           1889 | 127.0.0.1
                                       Appending to commitlog [Native-Transport-Requests-1] | 2017-02-09 17:03:15.653000 | 127.0.0.1 |           1985 | 127.0.0.1
                       Adding to transactions_by_day memtable [Native-Transport-Requests-1] | 2017-02-09 17:03:15.653001 | 127.0.0.1 |           2118 | 127.0.0.1
                           Adding to cc_transactions memtable [Native-Transport-Requests-1] | 2017-02-09 17:03:15.653001 | 127.0.0.1 |           2270 | 127.0.0.1
        Sending MUTATION message to /127.0.0.2 [MessagingService-Outgoing-/127.0.0.2-Small] | 2017-02-09 17:03:15.654000 | 127.0.0.1 |           2744 | 127.0.0.1
           MUTATION message received from /127.0.0.1 [MessagingService-Incoming-/127.0.0.1] | 2017-02-09 17:03:15.654000 | 127.0.0.2 |             69 | 127.0.0.1
        Sending MUTATION message to /127.0.0.3 [MessagingService-Outgoing-/127.0.0.3-Small] | 2017-02-09 17:03:15.654000 | 127.0.0.1 |           2773 | 127.0.0.1
           MUTATION message received from /127.0.0.1 [MessagingService-Incoming-/127.0.0.1] | 2017-02-09 17:03:15.655000 | 127.0.0.3 |             42 | 127.0.0.1
                                                   Appending to commitlog [MutationStage-1] | 2017-02-09 17:03:15.655000 | 127.0.0.2 |            719 | 127.0.0.1
             Creating materialized view mutations from base table replica [MutationStage-1] | 2017-02-09 17:03:15.655000 | 127.0.0.2 |            952 | 127.0.0.1
                                                   Appending to commitlog [MutationStage-3] | 2017-02-09 17:03:15.656000 | 127.0.0.3 |            873 | 127.0.0.1
                      Executing single-partition query on cc_transactions [MutationStage-1] | 2017-02-09 17:03:15.656000 | 127.0.0.2 |           1125 | 127.0.0.1
             Creating materialized view mutations from base table replica [MutationStage-3] | 2017-02-09 17:03:15.656000 | 127.0.0.3 |           1168 | 127.0.0.1
                                             Acquiring sstable references [MutationStage-1] | 2017-02-09 17:03:15.656000 | 127.0.0.2 |           1327 | 127.0.0.1
                      Executing single-partition query on cc_transactions [MutationStage-3] | 2017-02-09 17:03:15.656000 | 127.0.0.3 |           1364 | 127.0.0.1
                                                Merging memtable contents [MutationStage-1] | 2017-02-09 17:03:15.656000 | 127.0.0.2 |           1565 | 127.0.0.1
                                             Acquiring sstable references [MutationStage-3] | 2017-02-09 17:03:15.656000 | 127.0.0.3 |           1491 | 127.0.0.1
                                                Merging memtable contents [MutationStage-3] | 2017-02-09 17:03:15.657000 | 127.0.0.3 |           1625 | 127.0.0.1
                                        Read 1 live and 0 tombstone cells [MutationStage-1] | 2017-02-09 17:03:15.657000 | 127.0.0.2 |           2194 | 127.0.0.1
                                        Read 1 live and 0 tombstone cells [MutationStage-3] | 2017-02-09 17:03:15.657000 | 127.0.0.3 |           2274 | 127.0.0.1
                                        Determining replicas for mutation [MutationStage-1] | 2017-02-09 17:03:15.657000 | 127.0.0.2 |           2403 | 127.0.0.1
                                        Determining replicas for mutation [MutationStage-3] | 2017-02-09 17:03:15.657000 | 127.0.0.3 |           2454 | 127.0.0.1
                                                   Appending to commitlog [MutationStage-1] | 2017-02-09 17:03:15.657000 | 127.0.0.2 |           2523 | 127.0.0.1
                                   Adding to transactions_by_day memtable [MutationStage-1] | 2017-02-09 17:03:15.657000 | 127.0.0.2 |           2675 | 127.0.0.1
                                       Adding to cc_transactions memtable [MutationStage-1] | 2017-02-09 17:03:15.657000 | 127.0.0.2 |           2866 | 127.0.0.1
                                         Enqueuing response to /127.0.0.1 [MutationStage-1] | 2017-02-09 17:03:15.657001 | 127.0.0.2 |           3054 | 127.0.0.1
   REQUEST_RESPONSE message received from /127.0.0.2 [MessagingService-Incoming-/127.0.0.1] | 2017-02-09 17:03:15.658000 | 127.0.0.1 |             73 | 127.0.0.1
Sending REQUEST_RESPONSE message to /127.0.0.1 [MessagingService-Outgoing-/127.0.0.1-Small] | 2017-02-09 17:03:15.658000 | 127.0.0.2 |           3318 | 127.0.0.1
                               Processing response from /127.0.0.2 [RequestResponseStage-5] | 2017-02-09 17:03:15.658000 | 127.0.0.1 |            265 | 127.0.0.1
                                                   Appending to commitlog [MutationStage-3] | 2017-02-09 17:03:15.658000 | 127.0.0.3 |           2610 | 127.0.0.1
                                   Adding to transactions_by_day memtable [MutationStage-3] | 2017-02-09 17:03:15.658000 | 127.0.0.3 |           2884 | 127.0.0.1
                                       Adding to cc_transactions memtable [MutationStage-3] | 2017-02-09 17:03:15.658000 | 127.0.0.3 |           3116 | 127.0.0.1
                                         Enqueuing response to /127.0.0.1 [MutationStage-3] | 2017-02-09 17:03:15.658000 | 127.0.0.3 |           3339 | 127.0.0.1
   REQUEST_RESPONSE message received from /127.0.0.3 [MessagingService-Incoming-/127.0.0.1] | 2017-02-09 17:03:15.661000 | 127.0.0.1 |             44 | 127.0.0.1
Sending REQUEST_RESPONSE message to /127.0.0.1 [MessagingService-Outgoing-/127.0.0.1-Small] | 2017-02-09 17:03:15.661000 | 127.0.0.3 |           5864 | 127.0.0.1
                               Processing response from /127.0.0.3 [RequestResponseStage-4] | 2017-02-09 17:03:15.662000 | 127.0.0.1 |            302 | 127.0.0.1
                                                                           Request complete | 2017-02-09 17:03:15.653748 | 127.0.0.1 |           2748 | 127.0.0.1
As you can see from the traces, the additional cost on the writes is significant.
Bear in mind that this is not a fair comparison – we are comparing a single-table write with another one that is effectively writing to two tables. The reason for including is to demonstrate the the difference in executing the same CQL write with or without a Materialized View.
In a realistic situation you would execute two writes on the client side, one to the base table and another to the Materialized View, or more likely a batch of two writes to ensure atomicity. According to DataStax performance tests, in such cases the built-in Materialized Views perform better than the manual denormalization (with batching), especially for single-row partitions.

Deleting and mutating data

Deletes and updates generally work the way you would expect. Given the following state:
> select * from cc_transactions where userid = 'Bob' and year = 2017;

 userid | year | month | day | id | amount | card                | status
--------+------+-------+-----+----+--------+---------------------+-----------
    Bob | 2017 |     2 |   6 |  3 |    -17 | 2222-2222-2222-2222 | COMPLETED
    Bob | 2017 |     2 |   7 |  4 |    -32 | 2222-2222-2222-2222 | COMPLETED
    Bob | 2017 |     2 |   7 |  5 |   1200 | 2222-2222-2222-2222 | COMPLETED

> select * from transactions_by_day where year = 2017 and month = 2 and day = 7;

 year | month | day | userid | id | amount | card                | status
------+-------+-----+--------+----+--------+---------------------+-----------
 2017 |     2 |   7 |    Bob |  4 |    -32 | 2222-2222-2222-2222 | COMPLETED
 2017 |     2 |   7 |    Bob |  5 |   1200 | 2222-2222-2222-2222 | COMPLETED
If we execute
update cc_transactions set status = 'PENDING' where userid = 'Bob' and year = 2017 and month = 2 and day = 7 and id = 5;
delete from cc_transactions where userid = 'Bob' and year = 2017 and month = 2 and day = 7 and id = 4;
Then
> select * from cc_transactions where userid = 'Bob' and year = 2017;

 userid | year | month | day | id | amount | card                | status
--------+------+-------+-----+----+--------+---------------------+-----------
    Bob | 2017 |     2 |   6 |  3 |    -17 | 2222-2222-2222-2222 | COMPLETED
    Bob | 2017 |     2 |   7 |  5 |   1200 | 2222-2222-2222-2222 |   PENDING

> select * from transactions_by_day where year = 2017 and month = 2 and day = 7;

 year | month | day | userid | id | amount | card                | status
------+-------+-----+--------+----+--------+---------------------+---------
 2017 |     2 |   7 |    Bob |  5 |   1200 | 2222-2222-2222-2222 | PENDING

Tombstones when updating

There are some unexpected cases worth keeping in mind. When updating a column that is made part of a Materialized View’s primary key, Cassandra will execute a DELETE and an INSERT statement to get the View into the correct state – thus resulting in a tombstone.
To demonstrate this, let’s suppose we want to be able to query transactions for a user by status:
CREATE MATERIALIZED VIEW transactions_by_status AS
    SELECT year, month, day, userid, id, amount, card, status
    FROM cc_transactions
    WHERE userid IS NOT NULL AND year IS NOT NULL AND month IS NOT NULL AND day IS NOT NULL AND id IS NOT NULL AND status IS NOT NULL
    PRIMARY KEY ((userid, year, status), month, day, id);
Truncating the base table and executing:
insert into cc_transactions (userid, year, month, day, id, card, amount, status) values ('Bob', 2017, 2, 6, 3, '2222-2222-2222-2222', -17, 'PENDING');
update cc_transactions set status = 'COMPLETED' where userid = 'Bob' and year = 2017 and month = 2 and day = 6 and id = 3;
After nodetool flush and taking a look at the SSTable of transactions_by_status:
[
  {
    "partition" : {
      "key" : [ "Bob", "2017", "COMPLETED" ],
      "position" : 0
    },
    "rows" : [
      {
        "type" : "row",
        "position" : 39,
        "clustering" : [ "2", "6", "3" ],
        "liveness_info" : { "tstamp" : "2017-02-10T10:04:33.387990Z" },
        "cells" : [
          { "name" : "amount", "value" : "-17", "tstamp" : "2017-02-10T10:04:06.195953Z" },
          { "name" : "card", "value" : "2222-2222-2222-2222", "tstamp" : "2017-02-10T10:04:06.195953Z" }
        ]
      }
    ]
  },
  {
    "partition" : {
      "key" : [ "Bob", "2017", "PENDING" ],
      "position" : 88
    },
    "rows" : [
      {
        "type" : "row",
        "position" : 125,
        "clustering" : [ "2", "6", "3" ],
        "deletion_info" : { "marked_deleted" : "2017-02-10T10:04:06.195953Z", "local_delete_time" : "2017-02-10T10:04:33Z" },
        "cells" : [ ]
      }
    ]
  }
]
Notice the tombstoned row for partition (“Bob”, “2017”, “PENDING”) – this is a result of the initial insert and subsequent update. This is because by updating status in the base table, we have effectively created a new row in the Materialized View, deleting the old one.
This particular data structure is strongly discouraged: it will result in having a lot of tombstones in the (“Bob”, “2017”, “PENDING”) partition and is prone to hitting the tombstone warning and failure thresholds. Even worse – it is not immediately obvious that you are generating tombstones.
Instead of using a Materialized View, a SASI index is a much better choice for this particular case.

Creating a Materialized View on existing datasets

It is also possible to create a Materialized View over a table that already has data. In such cases Cassandra will create a View that has all the necessary data. As this might take a significant amount of time depending on the amount of data held in the base table, it is possible to track status via the system.built_views metadata table.

Conclusion. Should I use it?

Materialized Views sounds like a great feature. Pushing the responsibility to maintain denormalizations for queries to the database is highly desirable and reduces the complexity of applications using Cassandra.
However the current implementation has many shortcomings that make it difficult to use in most cases. Most importantly the serious restrictions on the possible primary keys of the Materialized Views limit their usefulness a great deal. In addition any Views will have to have a well-chosen partition key and extra consideration needs to be given to unexpected tombstone generation in the Materialized Views.
And, there is a definite performance hit compared to simple writes. If an application is sensitive to write latency and throughput, consider the options carefully (Materialized Views, manual denormalisation) and do a proper performance testing exercise before making a choice.

To summarise – Materialized Views is an addition to CQL that is, in its current form suitable in a few use-cases: when write throughput is not a concern and the data model can be created within the functional limitations.
========================================================================================================


7 easy steps to Cassandra cluster migration

7 easy steps to Cassandra cluster migration
===================
Here’s a recommended 7-step Cassandra cluster migration order-of-operations that will avoid any downtime:

1) Get your existing environment ready

First of all, make sure that your application is using a datacentre-aware load balancing policy, as well as LOCAL_*. Also, check that all of the keyspaces that will be copied over to the new cluster are set to use NetworkTopologyStrategy as their replication strategy. It’s also recommended that all keyspaces use this replication strategy when created, as altering this later can become complicated.

2) Create the new cluster

Now it’s time to create the new cluster that you’ll be migrating to. A few things to be careful about here: be sure that the new cluster and the original cluster use the same Cassandra version and cluster name. Also, the new datacenter name that you use must be different from the name of the existing datacenter.

3) Join the clusters together

To do this, first make any necessary firewall rule changes in order to allow the clusters to be joined, remembering that some changes to the source cluster may also be necessary. Then, change the new cluster’s seed nodes – and start them. Once this is done, the new cluster will be a second datacenter in the original cluster.

4) Change the replication settings 

Next, in the existing cluster, update the replication settings for the keyspaces that will be copied, so that data will now be replicated with the new datacenter as the destination.

5) Copy the data to the new cluster

When the clusters are joined together, Cassandra will begin to replicate writes to the new cluster. It’s still necessary, however, to copy any existing data over with the nodetool rebuild function. It’s a best practice to perform this function on the new cluster one or two nodes at a time, so as not to place an overwhelming streaming load on the existing cluster.

6) Change over the application’s connection points

After all uses of the rebuild function are completed, each of the clusters will contain a complete copy of the data being migrated, which Cassandra will keep in sync automatically. It’s now time to change the initial connection points of your application over to the nodes in the new cluster. Once this is completed, all reads and writes will be served by the new cluster, and will subsequently be replicated in the original cluster. Finally, it’s smart to run a repair function across the cluster, in order to ensure that all data has been replicated successfully from the original. 

7) Shut down the original cluster

Complete the process with a little post-migration clean up, removing the original cluster. First, change the firewall rules to disconnect the original cluster from the new one. Then, update the replication settings in the new cluster to cease replication of data to the original cluster. Lastly, shut the original cluster down.
There you have it: your Apache Cassandra deployment has been fully migrated, with zero downtime, low risk and in a manner completely seamless and transparent from the perspective of your end users.
=================================



Thursday, 8 February 2018

Apache Cassandra Compaction Strategies

Apache Cassandra Compaction Strategies

Cassandra’s Write Path
To understand the importance of compactions in Cassandra, you must first understand how Cassandra writes data to disk. The Cassandra write path in a nutshell:
  1. Cassandra stores recent writes in memory (in a structure called the Memtable).
  2. When enough writes have been made, Cassandra flushes the Memtable to disk. Data on disk is stored in relatively simple data structures called Sorted String Tables (SSTable). At the most simplified level, an SSTable could be described as a sorted array of strings.
  3. Before writing a new SSTable, Cassandra merges and pre-sorts the data in the Memtable according by Primary Key. In Cassandra a Primary Key consists of a Partition Key (unique key that determines which node the data is stored on) and any Clustering Keys that have been defined.
  4. The SSTable is written to disk as a single contiguous write operation. SStables are immutable. Once they are written to disk they are not modified. Any updates to data, or deletion of data within an SSTable is written to a new SSTable. If data is updated regularly, Cassandra may need to read from multiple SSTables to retrieve a single row.
  5. Compaction operations occur periodically to re-write and combine SSTables. This is required becuase SSTables are immutable (no modifications once written to disk). Compactions prune deleted data and merge disparate row data into new SSTables in order to reclaim disk space and keep read operations optimised.
If you are unfamiliar with Cassandra’s write path, please read The write path to compaction from Datastax.

Cassandra Compaction Strategies

Multiple Compaction Strategies are included with Cassandra, and each is optimized for a different use case:
TypeDescriptionWhen?
SizeTiered Compaction Strategy (STCS)This is the default compaction strategy. This compaction strategy triggers a compaction when multiple SSTables of a similar size are present. Additional of parameters allow STCS to be tuned to increase or decrease the number of compactions it performs and how tombstones are handled.This compaction strategy is good for insert-heavy and general workloads.
Leveled Compaction Strategy (LCS)This strategy groups SSTables into levels, each of which has a fixed size limit which is 10 times larger than the previous level. SSTables are of a fixed, relatively small size (160MB by default) – so if Level 1 might contain 10 SSTables at most, then Level 2 will contain 100 SSTables at most. SSTables are guaranteed to be non-overlapping within each level – if any data overlaps when a table is promoted to the next level, overlapping tables are re-compacted.
For example: when Level 1 is filled, any new SSTables being added to that level are compacted together with any existing tables that contain overlapping data. If these compactions result in Level 1now containing too many tables, the additional table(s) overflow to Level 2.
This compaction strategy is the best for read-heavy workloads (because tables within a level are non-overlapping, LCS guarantees that 90% of all reads can be satisfied from a single SSTable) or workloads where there are more updates than there are inserts.
DateTiered Compaction Strategy (DTCS)This compaction strategy is designed for use with time-series data. DTCS stores data written within a the same time period in the same SSTable. Multiple SSTables that are themselves written in the same time window will be compacted together, up until a certain point, after which the SSTables are no longer compacted. SSTables are also configured with a TTL. SSTables that are older than the TTL will be dropped, incurring zero compaction overhead.DTCS is highly performant and efficient, but only if the workload matches the strict requirements of DTCS. DTCS is not designed to be used with workloads where there are updates to old data or inserts that are out of order. If your workload does not fit these requirements, you may be better off using STCS and using a bucketing key (such as hour/day/week) to break up your data.

Configuring a Compaction Strategy

Compaction options are configured at the table level via CQLSH. This allows each table to be optimised based on how it will be used. If a compaction strategy is not specified, SizeTieredCompactionStrategy will be used.

Cassandra Authentication and Create User

Cassandra Authentication and Create User: By default when we install cassandra on a machine it do not has any username and password a...