From f4c547abf39a598e7841ea833910ab5a65facf8d Mon Sep 17 00:00:00 2001 From: poudro Date: Tue, 6 Nov 2012 17:50:38 +0100 Subject: [PATCH 1/2] add oplog_replay to prevent excessive time spent in oplog queries since index on capped collections don't work so well... --- mmm/triggers.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/mmm/triggers.py b/mmm/triggers.py index 987ff1f..d7673e1 100644 --- a/mmm/triggers.py +++ b/mmm/triggers.py @@ -10,7 +10,6 @@ class Triggers(object): def __init__(self, connection, checkpoint): self._conn = connection self._oplog = self._conn.local.oplog.rs - self._oplog.ensure_index('ts') self._callbacks = defaultdict(list) self.checkpoint = checkpoint @@ -19,7 +18,7 @@ def run(self): yield self.checkpoint spec = dict(ts={'$gt': self.checkpoint}) q = self._oplog.find( - spec, tailable=True, await_data=True) + spec, tailable=True, await_data=True, oplog_replay=True) found=False # log.debug('Query on %s', self._oplog) for op in q.sort('$natural'): From d306810d6d38b0b52cfea7f6d0dcb6269c27cc3b Mon Sep 17 00:00:00 2001 From: poudro Date: Mon, 3 Dec 2012 18:37:28 +0100 Subject: [PATCH 2/2] update for new version of mongo new oplog AND to replay starting at last mark point when mmm crashes --- mmm/slave.py | 30 ++++++++++++++++-------------- scripts/mmm | 33 +++++++++++++++++++++++++++------ 2 files changed, 43 insertions(+), 20 deletions(-) diff --git a/mmm/slave.py b/mmm/slave.py index 0e74387..d255258 100644 --- a/mmm/slave.py +++ b/mmm/slave.py @@ -3,8 +3,9 @@ import uuid import logging +import bson import gevent - +#import pprint from .triggers import Triggers log = logging.getLogger(__name__) @@ -27,9 +28,8 @@ class ReplicationSlave(object): } ''' - def __init__(self, topology, name, connect=None, timestamp=None): + def __init__(self, topology, name, connect=None): if connect: self.connect = connect - if timestamp: self.timestamp = timestamp self._topology = topology self.name = name topo = topology[name] @@ -41,6 +41,7 @@ def __init__(self, topology, name, connect=None, timestamp=None): self._coll = self._conn.local[MMM_DB_NAME] self._config = {} self._greenlets = [] + self._master_checkpoint = None def connect(self, *args, **kwargs): '''Connect to a mongod server. Factored into a method so we can delay @@ -50,14 +51,6 @@ def connect(self, *args, **kwargs): from pymongo import Connection return Connection(*args, **kwargs) - def timestamp(self, *args, **kwargs): - '''Create a bson.Timestamp. Factored into a method so we can delay - importing socket until gevent.monkey.patch_all() is called. You could - also insert another timestamp function to set options if you so desire. - ''' - import bson - return bson.Timestamp(*args, **kwargs) - def start(self, checkpoint=None): for gl in self._greenlets: gl.kill() @@ -70,12 +63,21 @@ def start(self, checkpoint=None): self.replicate, master_uri, checkpoint)) def load_config(self): + from pymongo import Connection self._config = {} name_by_id = dict( (sconf['id'], name) for name, sconf in self._topology.items()) for master in self._coll.find(): - self._config[name_by_id[master['_id']]] = master + if master['_id'] != 'master_checkpoint': + self._config[name_by_id[master['_id']]] = master + + master_uri = self._topology[name_by_id[master['_id']]]['uri'] + conn = Connection(master_uri) + coll = conn.local[MMM_DB_NAME] + for cp in coll.find(dict(_id='master_checkpoint')): + self._master_checkpoint = cp['ts'] + conn.disconnect() def clear_config(self): self._config = {} @@ -139,7 +141,7 @@ def replicate(self, master_name, checkpoint=None): checkpoint = master_repl_config.get('checkpoint') if checkpoint is None: # By default, start replicating as of NOW - checkpoint = self.timestamp(long(time.time()), 0) + checkpoint = bson.Timestamp(long(time.time()), 0) triggers = Triggers(conn, checkpoint) for repl in master_repl_config['replication']: triggers.register( @@ -159,7 +161,7 @@ def _replicate_to_trigger(self, src_id, dst): src_id = uuid.UUID(src_id) db, cname = dst.split('.', 1) collection = self._conn[db][cname] - def trigger(ts, h, op, ns, o, o2=None, b=False): + def trigger(ts, h, op, ns, o, v, o2=None, b=False): log.info('%s <= %s: %s %s', self.id, src_id, op, ns) if op == 'i': if o.get(MMM_REPL_FLAG) == self.id: diff --git a/scripts/mmm b/scripts/mmm index b0596d3..21e4c3d 100644 --- a/scripts/mmm +++ b/scripts/mmm @@ -1,4 +1,4 @@ -#!/usr/bin/env python +#!/usr/bin/python import gevent.monkey gevent.monkey.patch_all() @@ -6,11 +6,11 @@ import json import bson import argparse import logging.config - +#import pprint import yaml +import time def main(): - global log parser = argparse.ArgumentParser() parser.add_argument( '-l', '--logging', dest='logging', default=None, @@ -109,12 +109,33 @@ def run_replicate(config, servers, args): def run_main(config, servers, args): log = logging.getLogger('mmm') for s in servers.values(): - s.start() + s.start(s._master_checkpoint) + + masters = {} + for name,s in servers.items(): + for master in s._coll.find(): + if master['_id'] != 'master_checkpoint': + masters[master['_id']] = True + + for name,s in servers.items(): + try: + if masters[s._topology[name]['id']] == True: + masters[s._topology[name]['id']] = s + except: + True + +# pprint.pprint('masters output') +# for name,m in masters.items(): +# pprint.pprint(name) +# pprint.pprint(m._coll) +# pprint.pprint('masters output') + while True: gevent.sleep(5) + ts = bson.Timestamp(long(time.time()), 1) log.info('=== mark ===') - - + for name,m in masters.items(): + m._coll.update( dict(_id="master_checkpoint"), { '$set': { 'ts': ts } }, upsert=True )