Jump to content

Recommended Posts

Posted

We're in the process of migrating our database cluster from BigCouch to CouchDB 3.x and wanted to create a thread to document the changes required to keep a Kazoo 4.x cluster running and migration issues we've experienced.

When running a replicate from BigCouch to CouchDB, we're getting a number of dropped connections that result in partially transferred databases.  Re-running the replication will migrate more and more documents over until the replication is ultimately successful so we wrote a script to perform this (c# - see below).  We've confirmed there are no network or firewall issues between clusters and they're even on the same subnet.  Regardless, this script attached worked for us.

using System.Net.Http.Json;
using System.Web;
using Newtonsoft.Json;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using NLog;
using NLog.Config;
using NLog.Targets;
using NLog.Extensions.Logging;
using NLog.Conditions;

class Program
{
    const string SourceHost = "x.x.x.x";
    const string TargetHost = "x.x.x.x";
    const string TargetUser = "username";
    const string TargetPass = "password";
    const int MaxReplicationAttempts = 50;

    static async Task<int> Main(string[] args)
    {
        // Set up a service collection
        var serviceCollection = new ServiceCollection();
        ConfigureServices(serviceCollection);

        // Build the service provider
        var serviceProvider = serviceCollection.BuildServiceProvider();

        // Get the logger
        var logger = serviceProvider.GetRequiredService<ILogger<Program>>();

        var ListOfDatabases = new List<string>();

        using (var httpClient = new HttpClient())
        {
            try
            {
                var response = await httpClient.GetAsync($"http://{SourceHost}:5984/_all_dbs");
                if (!response.IsSuccessStatusCode)
                {
                    return 1;
                }
                var jsonString = await response.Content.ReadAsStringAsync();
                ListOfDatabases = JsonConvert.DeserializeObject<List<string>>(jsonString) ?? [];
                logger.LogInformation("Received {dbCount} database strings", ListOfDatabases.Count);
            }
            catch (Exception ex)
            {
                logger.LogError(ex, "Error getting databases from source host: {sourceHost}", SourceHost);
                return 1;
            }
        }

        ListOfDatabases.Remove("_users");
        ListOfDatabases.Remove("_replicator");
        ListOfDatabases.Remove("_global_changes");


        List<string> FailedReplications = [];

        int SuccessCount = 0;
        int FailureCount = 0;
        foreach (var Database in ListOfDatabases)
        {
            logger.LogInformation("----------------------------------------\n");
            var handler = new HttpClientHandler
            {
                ServerCertificateCustomValidationCallback = (sender, cert, chain, sslPolicyErrors) => true
                
            };
            using var httpClient = new HttpClient(handler);

            httpClient.DefaultRequestHeaders.UserAgent.ParseAdd("CouchDB-Replication-Tool/1.0");
            // Add Basic Authentication header
            var authString = $"{TargetUser}:{TargetPass}";
            var base64Auth = Convert.ToBase64String(System.Text.Encoding.ASCII.GetBytes(authString));
            httpClient.DefaultRequestHeaders.Authorization = new System.Net.Http.Headers.AuthenticationHeaderValue("Basic", base64Auth);

            // Set the Accept header to accept all types
            httpClient.DefaultRequestHeaders.Accept.Add(
                new System.Net.Http.Headers.MediaTypeWithQualityHeaderValue("*/*"));

            
            var replicateUrl = $"http://{TargetHost}:5984/_replicate";  // Remove credentials from URL
            var DatabaseEncoded = HttpUtility.UrlEncode(Database);
            var replicationRequest = new
            {
                source = $"http://{SourceHost}:5984/{DatabaseEncoded}",
                target = $"http://{TargetUser}:{TargetPass}@{TargetHost}:5984/{DatabaseEncoded}",  
                create_target = true,
            };

            bool ReplicationComleted = false;
            logger.LogInformation("Starting Replication of {Database} database from {SourceHost} to {TargetHost}", Database, SourceHost, TargetHost);
            var jsonContent = JsonConvert.SerializeObject(replicationRequest);
            var content = new StringContent(jsonContent, System.Text.Encoding.UTF8, "application/json");
            content.Headers.ContentLength = jsonContent.Length;

            int Attempts = 0;
            while(ReplicationComleted == false)
            {
                Attempts++;
                try
                {
                    var response = await httpClient.PostAsync(replicateUrl, content);
                    logger.LogInformation("Replicate request returned: {status}", response.ReasonPhrase);

                    if (!response.IsSuccessStatusCode)
                    {
                        logger.LogWarning("Replicate request failed for {Database}", Database);
                        await Task.Delay(TimeSpan.FromSeconds(15)); // Add a 1 second delay between replication attempts

                    }
                    else
                    {
                        SuccessCount++;
                        ReplicationComleted = true;
                    }
                }
                catch (Exception ex)
                {
                    FailureCount++;
                    FailedReplications.Add(Database);
                    logger.LogError(ex, "Error replicating {Database}", Database);
                    break;
                }

                if(Attempts >= MaxReplicationAttempts)
                {
                    FailureCount++;
                    FailedReplications.Add(Database);
                    logger.LogError("Max replication attempts reached for {Database}", Database);
                    break;
                }
            }
            logger.LogInformation("Done replicating {Database}", Database);
        }

        logger.LogInformation("Replication completed with {successCount} successes and {failureCount} failures", SuccessCount, FailureCount);
        if(FailedReplications.Count > 0)
        {
            logger.LogWarning("Failed to replicate the following databases: {failedReplications}", string.Join("\n", FailedReplications));
        }

        return 0;
    }

    static void ConfigureServices(IServiceCollection services)
    {
        var config = new LoggingConfiguration();
        
        // Console Target
        var consoleTarget = new ColoredConsoleTarget("console")
        {
            Layout = "${level:uppercase=true}|${message} ${exception:format=tostring}",
            EnableAnsiOutput = true,
        };

        consoleTarget.WordHighlightingRules.Add(new ConsoleWordHighlightingRule
        {
            Text = "INFO",
            Condition = ConditionParser.ParseExpression("level = LogLevel.Info"),
            ForegroundColor = ConsoleOutputColor.Green,
            IgnoreCase = false,
            WholeWords = true
        });
        consoleTarget.WordHighlightingRules.Add(new ConsoleWordHighlightingRule
        {
            Text = "ERROR",
            Condition = ConditionParser.ParseExpression("level = LogLevel.Error"),
            ForegroundColor = ConsoleOutputColor.Red,
            IgnoreCase = false,
            WholeWords = true
        });
        consoleTarget.WordHighlightingRules.Add(new ConsoleWordHighlightingRule
        {
            Text = "WARN",
            Condition = ConditionParser.ParseExpression("level = LogLevel.Warn"),
            ForegroundColor = ConsoleOutputColor.Yellow,
            IgnoreCase = false,
            WholeWords = true
        });
        
        /*
        // Set colors for different log levels
        consoleTarget.RowHighlightingRules.Add(new ConsoleRowHighlightingRule(
            condition: ConditionParser.ParseExpression("level == LogLevel.Info"),
            foregroundColor: ConsoleOutputColor.Green, backgroundColor: ConsoleOutputColor.White));
        consoleTarget.RowHighlightingRules.Add(new ConsoleRowHighlightingRule(
            condition: ConditionParser.ParseExpression("level == LogLevel.Error"),
            foregroundColor: ConsoleOutputColor.Red, backgroundColor: ConsoleOutputColor.White));
        consoleTarget.RowHighlightingRules.Add(new ConsoleRowHighlightingRule(
            condition: ConditionParser.ParseExpression("level == LogLevel.Warn"),
            foregroundColor: ConsoleOutputColor.Yellow, backgroundColor: ConsoleOutputColor.White));
            */

        config.AddTarget(consoleTarget);
        config.AddRule(NLog.LogLevel.Trace, NLog.LogLevel.Fatal, consoleTarget);

        LogManager.Configuration = config;

        services.AddLogging(loggingBuilder =>
        {
            loggingBuilder.ClearProviders();
            loggingBuilder.SetMinimumLevel(Microsoft.Extensions.Logging.LogLevel.Trace);
            loggingBuilder.AddNLog(config);
        });
    }
}

 

In addition, CouchDB no longer listens on 5986 so an HAProxy redirect is required to keep that functioning.  RuhNet helped with that and it's below:

frontend couch-5986-admin-port
    bind 127.0.0.1:15986
    default_backend couch-redir-node-admin-port

backend couch-redir-node-admin-port
  balance roundrobin
	#HAProxy < 2.0 uncomment the following
    #reqrep ^([^\ :]*)\ /(.*)     \1\ /_node/_local/\2
    
    #HAProxy 2.0 and above uncomment the following
    #http-request replace-uri ^/(.*)     /_node/_local/\1
                   
    server couch1 172.31.12.34:5984 check
    server couch2 172.31.23.45:5984 check
    server couch3 172.31.34.56:5984 check

Lastly, CouchDB no longer supports the Interator function and needs to be replaced with .forEach.  Per Ruhnet, the following needs to be done but we have yet to test this.  We're writing a script that will look through all documents and check for occurrences of Iterator so they can be replaced.

{
  "_id": "_design/trunkstore",
  "language": "javascript",
  "views": {
    "crossbar_listing": {
      "map": "function(doc) { if (doc.pvt_type != 'sys_info' || doc.pvt_deleted) return; emit(doc._id, {'realm': doc.account.auth_realm}); }",
      "reduce": "_count"
    },
    "lookup_did": {
      "map": "function(doc) { if(doc.pvt_type != 'sys_info' || doc.pvt_deleted ) return; var realm = doc.account.auth_realm; if(doc.servers) { doc.servers.forEach(function(srv) { var auth_clone = JSON.parse(JSON.stringify(srv.auth)); auth_clone.auth_realm = realm; if (srv.options.enabled != false && srv.DIDs) { for (var did in srv.DIDs) { emit(did, { 'callerid_server': srv.callerid || '', 'callerid_account': doc.callerid || '', 'e911_callerid_server': srv.e911_callerid || '', 'e911_callerid_account': doc.e911_callerid || '', 'auth': auth_clone, 'DID_Opts': srv.DIDs[did], 'inbound_format': srv.inbound_format || 'npan', 'server': srv.options, 'account': doc.account}); } } }) } }"
        },
    "lookup_user_flags": {
      "map": "function(doc) { if(doc.pvt_type != 'sys_info') return; var realm = doc.account.auth_realm; if(doc.call_restriction) { var call_restriction = JSON.parse(JSON.stringify(doc.call_restriction)) }; if(doc.servers) { var acct_clone = JSON.parse(JSON.stringify(doc.account)); doc.servers.forEach(function(srv) { if (srv.auth) { var srv_clone = JSON.parse(JSON.stringify(srv)); srv_clone.auth.auth_realm = realm; emit([realm.toLowerCase(), srv_clone.auth.auth_user.toLowerCase()], {\"server\": srv_clone, \"account\": acct_clone, \"call_restriction\": call_restriction}); } }) }}"
    },
    "lookup_did.old": {
      "map": "function(doc) { if(doc.pvt_type != 'sys_info' || doc.pvt_deleted ) return; var realm = doc.account.auth_realm; if(doc.servers) { var srvs = Iterator(doc.servers); for (var srv in srvs) { var auth_clone = JSON.parse(JSON.stringify(srv[1].auth)); auth_clone.auth_realm = realm; if (srv[1].enabled != false && srv[1].DIDs) { var DIDs = Iterator(srv[1].DIDs); for (var DID in DIDs) { emit(DID[0], { 'callerid_server': srv[1].callerid || '', 'callerid_account': doc.callerid || '', 'e911_callerid_server': srv[1].e911_callerid || '', 'e911_callerid_account': doc.e911_callerid || '', 'auth': auth_clone, 'DID_Opts': DID[1], 'inbound_format': srv[1].inbound_format || 'npan', 'server': srv[1].options, 'account': doc.account}); } } } } }"
    },
    "lookup_user_flags.old": {
      "map": "function(doc) { if(doc.pvt_type != 'sys_info') return; var realm = doc.account.auth_realm; if(doc.call_restriction) { var call_restriction = JSON.parse(JSON.stringify(doc.call_restriction)) }; if(doc.servers) { var acct_clone = JSON.parse(JSON.stringify(doc.account)); var srvs = Iterator(doc.servers); for (var srv in srvs) { if (srv[1].auth) { var srv_clone = JSON.parse(JSON.stringify(srv[1])); srv_clone.auth.auth_realm = realm; emit([realm.toLowerCase(), srv_clone.auth.auth_user.toLowerCase()], {\"server\": srv_clone, \"account\": acct_clone, \"call_restriction\": call_restriction}); } } }}"
    }
  }
}

 

Posted

No need to search for other occurrences of Iterator() — unless you have custom stuff that uses it, the Trunkstore view is the only place it appears in the Kazoo v4.3 codebase.

Nice replication script—I use a simple one written in Bash but yours is more informative. One thing you can do is add some delay (minutes) between replication requests which can keep from overwhelming a server. I’ve found that if I just machine-gun style replicate a bunch of DBs sometimes RAM and CPU get slammed.

Also, if worse comes to worse and you have too many issues letting Couch replicate itself, what you can do is a manual clone—in other words instead of writing a script to tell Couch what DBs to replicate, you write something that pulls each doc individually and creates it on the target machine. This way takes longer of course, but can be useful if there’s corrupted data or some other issue preventing a normal replication from succeeding fully, and you also get an internal reset of revisions on the target, since although the data is the same, it is a “new” document on the target side, and doesn’t have all the previous revision tombstones hanging around.

Also, although you aren’t doing this, for the benefit of other readers, you can upgrade Bigcouch to CouchDB v3, BUT you must first upgrade to v2, and then from there to v3.

Join the conversation

You can post now and register later. If you have an account, sign in now to post with your account.

Guest
Reply to this topic...

×   Pasted as rich text.   Paste as plain text instead

  Only 75 emoji are allowed.

×   Your link has been automatically embedded.   Display as a link instead

×   Your previous content has been restored.   Clear editor

×   You cannot paste images directly. Upload or insert images from URL.

×
×
  • Create New...