Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 16 additions & 14 deletions mmm/slave.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
import uuid
import logging

import bson
import gevent

#import pprint
from .triggers import Triggers

log = logging.getLogger(__name__)
Expand All @@ -27,9 +28,8 @@ class ReplicationSlave(object):
}
'''

def __init__(self, topology, name, connect=None, timestamp=None):
def __init__(self, topology, name, connect=None):
if connect: self.connect = connect
if timestamp: self.timestamp = timestamp
self._topology = topology
self.name = name
topo = topology[name]
Expand All @@ -41,6 +41,7 @@ def __init__(self, topology, name, connect=None, timestamp=None):
self._coll = self._conn.local[MMM_DB_NAME]
self._config = {}
self._greenlets = []
self._master_checkpoint = None

def connect(self, *args, **kwargs):
'''Connect to a mongod server. Factored into a method so we can delay
Expand All @@ -50,14 +51,6 @@ def connect(self, *args, **kwargs):
from pymongo import Connection
return Connection(*args, **kwargs)

def timestamp(self, *args, **kwargs):
'''Create a bson.Timestamp. Factored into a method so we can delay
importing socket until gevent.monkey.patch_all() is called. You could
also insert another timestamp function to set options if you so desire.
'''
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is a clear reason not to remove this method.

import bson
return bson.Timestamp(*args, **kwargs)

def start(self, checkpoint=None):
for gl in self._greenlets:
gl.kill()
Expand All @@ -70,12 +63,21 @@ def start(self, checkpoint=None):
self.replicate, master_uri, checkpoint))

def load_config(self):
from pymongo import Connection
self._config = {}
name_by_id = dict(
(sconf['id'], name)
for name, sconf in self._topology.items())
for master in self._coll.find():
self._config[name_by_id[master['_id']]] = master
if master['_id'] != 'master_checkpoint':
self._config[name_by_id[master['_id']]] = master

master_uri = self._topology[name_by_id[master['_id']]]['uri']
conn = Connection(master_uri)
coll = conn.local[MMM_DB_NAME]
for cp in coll.find(dict(_id='master_checkpoint')):
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't use dict for it: {'_id': 'master_checkpoint'}

self._master_checkpoint = cp['ts']
conn.disconnect()

def clear_config(self):
self._config = {}
Expand Down Expand Up @@ -139,7 +141,7 @@ def replicate(self, master_name, checkpoint=None):
checkpoint = master_repl_config.get('checkpoint')
if checkpoint is None:
# By default, start replicating as of NOW
checkpoint = self.timestamp(long(time.time()), 0)
checkpoint = bson.Timestamp(long(time.time()), 0)
triggers = Triggers(conn, checkpoint)
for repl in master_repl_config['replication']:
triggers.register(
Expand All @@ -159,7 +161,7 @@ def _replicate_to_trigger(self, src_id, dst):
src_id = uuid.UUID(src_id)
db, cname = dst.split('.', 1)
collection = self._conn[db][cname]
def trigger(ts, h, op, ns, o, o2=None, b=False):
def trigger(ts, h, op, ns, o, v, o2=None, b=False):
log.info('%s <= %s: %s %s', self.id, src_id, op, ns)
if op == 'i':
if o.get(MMM_REPL_FLAG) == self.id:
Expand Down
3 changes: 1 addition & 2 deletions mmm/triggers.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ class Triggers(object):
def __init__(self, connection, checkpoint):
self._conn = connection
self._oplog = self._conn.local.oplog.rs
self._oplog.ensure_index('ts')
self._callbacks = defaultdict(list)
self.checkpoint = checkpoint

Expand All @@ -19,7 +18,7 @@ def run(self):
yield self.checkpoint
spec = dict(ts={'$gt': self.checkpoint})
q = self._oplog.find(
spec, tailable=True, await_data=True)
spec, tailable=True, await_data=True, oplog_replay=True)
found=False
# log.debug('Query on %s', self._oplog)
for op in q.sort('$natural'):
Expand Down
33 changes: 27 additions & 6 deletions scripts/mmm
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
#!/usr/bin/env python
#!/usr/bin/python
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is wrong

import gevent.monkey
gevent.monkey.patch_all()

import json
import bson
import argparse
import logging.config

#import pprint
import yaml
import time

def main():
global log
parser = argparse.ArgumentParser()
parser.add_argument(
'-l', '--logging', dest='logging', default=None,
Expand Down Expand Up @@ -109,12 +109,33 @@ def run_replicate(config, servers, args):
def run_main(config, servers, args):
log = logging.getLogger('mmm')
for s in servers.values():
s.start()
s.start(s._master_checkpoint)

masters = {}
for name,s in servers.items():
for master in s._coll.find():
if master['_id'] != 'master_checkpoint':
masters[master['_id']] = True

for name,s in servers.items():
try:
if masters[s._topology[name]['id']] == True:
masters[s._topology[name]['id']] = s
except:
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does it mean?


# pprint.pprint('masters output')
# for name,m in masters.items():
# pprint.pprint(name)
# pprint.pprint(m._coll)
# pprint.pprint('masters output')
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There shouldn't be any commented code. If you don't use it, remove it.


while True:
gevent.sleep(5)
ts = bson.Timestamp(long(time.time()), 1)
log.info('=== mark ===')


for name,m in masters.items():
m._coll.update( dict(_id="master_checkpoint"), { '$set': { 'ts': ts } }, upsert=True )



Expand Down