Mark McDonald Posted December 5, 2024 Report Posted December 5, 2024 We're in the process of migrating our database cluster from BigCouch to CouchDB 3.x and wanted to create a thread to document the changes required to keep a Kazoo 4.x cluster running and migration issues we've experienced. When running a replicate from BigCouch to CouchDB, we're getting a number of dropped connections that result in partially transferred databases. Re-running the replication will migrate more and more documents over until the replication is ultimately successful so we wrote a script to perform this (c# - see below). We've confirmed there are no network or firewall issues between clusters and they're even on the same subnet. Regardless, this script attached worked for us. using System.Net.Http.Json; using System.Web; using Newtonsoft.Json; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using NLog; using NLog.Config; using NLog.Targets; using NLog.Extensions.Logging; using NLog.Conditions; class Program { const string SourceHost = "x.x.x.x"; const string TargetHost = "x.x.x.x"; const string TargetUser = "username"; const string TargetPass = "password"; const int MaxReplicationAttempts = 50; static async Task<int> Main(string[] args) { // Set up a service collection var serviceCollection = new ServiceCollection(); ConfigureServices(serviceCollection); // Build the service provider var serviceProvider = serviceCollection.BuildServiceProvider(); // Get the logger var logger = serviceProvider.GetRequiredService<ILogger<Program>>(); var ListOfDatabases = new List<string>(); using (var httpClient = new HttpClient()) { try { var response = await httpClient.GetAsync($"http://{SourceHost}:5984/_all_dbs"); if (!response.IsSuccessStatusCode) { return 1; } var jsonString = await response.Content.ReadAsStringAsync(); ListOfDatabases = JsonConvert.DeserializeObject<List<string>>(jsonString) ?? []; logger.LogInformation("Received {dbCount} database strings", ListOfDatabases.Count); } catch (Exception ex) { logger.LogError(ex, "Error getting databases from source host: {sourceHost}", SourceHost); return 1; } } ListOfDatabases.Remove("_users"); ListOfDatabases.Remove("_replicator"); ListOfDatabases.Remove("_global_changes"); List<string> FailedReplications = []; int SuccessCount = 0; int FailureCount = 0; foreach (var Database in ListOfDatabases) { logger.LogInformation("----------------------------------------\n"); var handler = new HttpClientHandler { ServerCertificateCustomValidationCallback = (sender, cert, chain, sslPolicyErrors) => true }; using var httpClient = new HttpClient(handler); httpClient.DefaultRequestHeaders.UserAgent.ParseAdd("CouchDB-Replication-Tool/1.0"); // Add Basic Authentication header var authString = $"{TargetUser}:{TargetPass}"; var base64Auth = Convert.ToBase64String(System.Text.Encoding.ASCII.GetBytes(authString)); httpClient.DefaultRequestHeaders.Authorization = new System.Net.Http.Headers.AuthenticationHeaderValue("Basic", base64Auth); // Set the Accept header to accept all types httpClient.DefaultRequestHeaders.Accept.Add( new System.Net.Http.Headers.MediaTypeWithQualityHeaderValue("*/*")); var replicateUrl = $"http://{TargetHost}:5984/_replicate"; // Remove credentials from URL var DatabaseEncoded = HttpUtility.UrlEncode(Database); var replicationRequest = new { source = $"http://{SourceHost}:5984/{DatabaseEncoded}", target = $"http://{TargetUser}:{TargetPass}@{TargetHost}:5984/{DatabaseEncoded}", create_target = true, }; bool ReplicationComleted = false; logger.LogInformation("Starting Replication of {Database} database from {SourceHost} to {TargetHost}", Database, SourceHost, TargetHost); var jsonContent = JsonConvert.SerializeObject(replicationRequest); var content = new StringContent(jsonContent, System.Text.Encoding.UTF8, "application/json"); content.Headers.ContentLength = jsonContent.Length; int Attempts = 0; while(ReplicationComleted == false) { Attempts++; try { var response = await httpClient.PostAsync(replicateUrl, content); logger.LogInformation("Replicate request returned: {status}", response.ReasonPhrase); if (!response.IsSuccessStatusCode) { logger.LogWarning("Replicate request failed for {Database}", Database); await Task.Delay(TimeSpan.FromSeconds(15)); // Add a 1 second delay between replication attempts } else { SuccessCount++; ReplicationComleted = true; } } catch (Exception ex) { FailureCount++; FailedReplications.Add(Database); logger.LogError(ex, "Error replicating {Database}", Database); break; } if(Attempts >= MaxReplicationAttempts) { FailureCount++; FailedReplications.Add(Database); logger.LogError("Max replication attempts reached for {Database}", Database); break; } } logger.LogInformation("Done replicating {Database}", Database); } logger.LogInformation("Replication completed with {successCount} successes and {failureCount} failures", SuccessCount, FailureCount); if(FailedReplications.Count > 0) { logger.LogWarning("Failed to replicate the following databases: {failedReplications}", string.Join("\n", FailedReplications)); } return 0; } static void ConfigureServices(IServiceCollection services) { var config = new LoggingConfiguration(); // Console Target var consoleTarget = new ColoredConsoleTarget("console") { Layout = "${level:uppercase=true}|${message} ${exception:format=tostring}", EnableAnsiOutput = true, }; consoleTarget.WordHighlightingRules.Add(new ConsoleWordHighlightingRule { Text = "INFO", Condition = ConditionParser.ParseExpression("level = LogLevel.Info"), ForegroundColor = ConsoleOutputColor.Green, IgnoreCase = false, WholeWords = true }); consoleTarget.WordHighlightingRules.Add(new ConsoleWordHighlightingRule { Text = "ERROR", Condition = ConditionParser.ParseExpression("level = LogLevel.Error"), ForegroundColor = ConsoleOutputColor.Red, IgnoreCase = false, WholeWords = true }); consoleTarget.WordHighlightingRules.Add(new ConsoleWordHighlightingRule { Text = "WARN", Condition = ConditionParser.ParseExpression("level = LogLevel.Warn"), ForegroundColor = ConsoleOutputColor.Yellow, IgnoreCase = false, WholeWords = true }); /* // Set colors for different log levels consoleTarget.RowHighlightingRules.Add(new ConsoleRowHighlightingRule( condition: ConditionParser.ParseExpression("level == LogLevel.Info"), foregroundColor: ConsoleOutputColor.Green, backgroundColor: ConsoleOutputColor.White)); consoleTarget.RowHighlightingRules.Add(new ConsoleRowHighlightingRule( condition: ConditionParser.ParseExpression("level == LogLevel.Error"), foregroundColor: ConsoleOutputColor.Red, backgroundColor: ConsoleOutputColor.White)); consoleTarget.RowHighlightingRules.Add(new ConsoleRowHighlightingRule( condition: ConditionParser.ParseExpression("level == LogLevel.Warn"), foregroundColor: ConsoleOutputColor.Yellow, backgroundColor: ConsoleOutputColor.White)); */ config.AddTarget(consoleTarget); config.AddRule(NLog.LogLevel.Trace, NLog.LogLevel.Fatal, consoleTarget); LogManager.Configuration = config; services.AddLogging(loggingBuilder => { loggingBuilder.ClearProviders(); loggingBuilder.SetMinimumLevel(Microsoft.Extensions.Logging.LogLevel.Trace); loggingBuilder.AddNLog(config); }); } } In addition, CouchDB no longer listens on 5986 so an HAProxy redirect is required to keep that functioning. RuhNet helped with that and it's below: frontend couch-5986-admin-port bind 127.0.0.1:15986 default_backend couch-redir-node-admin-port backend couch-redir-node-admin-port balance roundrobin #HAProxy < 2.0 uncomment the following #reqrep ^([^\ :]*)\ /(.*) \1\ /_node/_local/\2 #HAProxy 2.0 and above uncomment the following #http-request replace-uri ^/(.*) /_node/_local/\1 server couch1 172.31.12.34:5984 check server couch2 172.31.23.45:5984 check server couch3 172.31.34.56:5984 check Lastly, CouchDB no longer supports the Interator function and needs to be replaced with .forEach. Per Ruhnet, the following needs to be done but we have yet to test this. We're writing a script that will look through all documents and check for occurrences of Iterator so they can be replaced. { "_id": "_design/trunkstore", "language": "javascript", "views": { "crossbar_listing": { "map": "function(doc) { if (doc.pvt_type != 'sys_info' || doc.pvt_deleted) return; emit(doc._id, {'realm': doc.account.auth_realm}); }", "reduce": "_count" }, "lookup_did": { "map": "function(doc) { if(doc.pvt_type != 'sys_info' || doc.pvt_deleted ) return; var realm = doc.account.auth_realm; if(doc.servers) { doc.servers.forEach(function(srv) { var auth_clone = JSON.parse(JSON.stringify(srv.auth)); auth_clone.auth_realm = realm; if (srv.options.enabled != false && srv.DIDs) { for (var did in srv.DIDs) { emit(did, { 'callerid_server': srv.callerid || '', 'callerid_account': doc.callerid || '', 'e911_callerid_server': srv.e911_callerid || '', 'e911_callerid_account': doc.e911_callerid || '', 'auth': auth_clone, 'DID_Opts': srv.DIDs[did], 'inbound_format': srv.inbound_format || 'npan', 'server': srv.options, 'account': doc.account}); } } }) } }" }, "lookup_user_flags": { "map": "function(doc) { if(doc.pvt_type != 'sys_info') return; var realm = doc.account.auth_realm; if(doc.call_restriction) { var call_restriction = JSON.parse(JSON.stringify(doc.call_restriction)) }; if(doc.servers) { var acct_clone = JSON.parse(JSON.stringify(doc.account)); doc.servers.forEach(function(srv) { if (srv.auth) { var srv_clone = JSON.parse(JSON.stringify(srv)); srv_clone.auth.auth_realm = realm; emit([realm.toLowerCase(), srv_clone.auth.auth_user.toLowerCase()], {\"server\": srv_clone, \"account\": acct_clone, \"call_restriction\": call_restriction}); } }) }}" }, "lookup_did.old": { "map": "function(doc) { if(doc.pvt_type != 'sys_info' || doc.pvt_deleted ) return; var realm = doc.account.auth_realm; if(doc.servers) { var srvs = Iterator(doc.servers); for (var srv in srvs) { var auth_clone = JSON.parse(JSON.stringify(srv[1].auth)); auth_clone.auth_realm = realm; if (srv[1].enabled != false && srv[1].DIDs) { var DIDs = Iterator(srv[1].DIDs); for (var DID in DIDs) { emit(DID[0], { 'callerid_server': srv[1].callerid || '', 'callerid_account': doc.callerid || '', 'e911_callerid_server': srv[1].e911_callerid || '', 'e911_callerid_account': doc.e911_callerid || '', 'auth': auth_clone, 'DID_Opts': DID[1], 'inbound_format': srv[1].inbound_format || 'npan', 'server': srv[1].options, 'account': doc.account}); } } } } }" }, "lookup_user_flags.old": { "map": "function(doc) { if(doc.pvt_type != 'sys_info') return; var realm = doc.account.auth_realm; if(doc.call_restriction) { var call_restriction = JSON.parse(JSON.stringify(doc.call_restriction)) }; if(doc.servers) { var acct_clone = JSON.parse(JSON.stringify(doc.account)); var srvs = Iterator(doc.servers); for (var srv in srvs) { if (srv[1].auth) { var srv_clone = JSON.parse(JSON.stringify(srv[1])); srv_clone.auth.auth_realm = realm; emit([realm.toLowerCase(), srv_clone.auth.auth_user.toLowerCase()], {\"server\": srv_clone, \"account\": acct_clone, \"call_restriction\": call_restriction}); } } }}" } } } Quote
RuhNet Posted December 6, 2024 Report Posted December 6, 2024 No need to search for other occurrences of Iterator() — unless you have custom stuff that uses it, the Trunkstore view is the only place it appears in the Kazoo v4.3 codebase. Nice replication script—I use a simple one written in Bash but yours is more informative. One thing you can do is add some delay (minutes) between replication requests which can keep from overwhelming a server. I’ve found that if I just machine-gun style replicate a bunch of DBs sometimes RAM and CPU get slammed. Also, if worse comes to worse and you have too many issues letting Couch replicate itself, what you can do is a manual clone—in other words instead of writing a script to tell Couch what DBs to replicate, you write something that pulls each doc individually and creates it on the target machine. This way takes longer of course, but can be useful if there’s corrupted data or some other issue preventing a normal replication from succeeding fully, and you also get an internal reset of revisions on the target, since although the data is the same, it is a “new” document on the target side, and doesn’t have all the previous revision tombstones hanging around. Also, although you aren’t doing this, for the benefit of other readers, you can upgrade Bigcouch to CouchDB v3, BUT you must first upgrade to v2, and then from there to v3. Quote
Recommended Posts
Join the conversation
You can post now and register later. If you have an account, sign in now to post with your account.