# Copyright 2016 - 2018 Ternaris.
# SPDX-License-Identifier: AGPL-3.0-only
import re
import sys
from collections import defaultdict, namedtuple
from itertools import groupby
import capnp # pylint: disable=unused-import
import marv_api as marv
import marv_nodes
from marv_api import DatasetInfo
from marv_ros import genpy
from marv_ros import rosbag
from marv_ros.rosbag import _get_message_type
from .bag_capnp import Bagmeta, Message # pylint: disable=import-error
class Baginfo(namedtuple('Baginfo', 'filename basename prefix timestamp idx')):
@classmethod
def parse(cls, filename):
assert filename.endswith('.bag'), filename
basename = filename[:-4]
parts = basename.rsplit('_', 2)
if parts[-1].isnumeric():
idx = int(parts.pop())
else:
idx = None
if re.match(r'\d{4}(?:-\d{2}){5}', parts[-1]):
timestamp = parts.pop()
else:
timestamp = None
if parts:
prefix = parts[0]
else:
prefix = None
return cls(filename, basename, prefix, timestamp, idx)
[docs]def scan(dirpath, dirnames, filenames): # pylint: disable=unused-argument
"""Scan for sets of ROS bag files.
Bags suffixed with a consecutive index are grouped into sets::
foo_0.bag
foo_1.bag
foo_3.bag
foo_4.bag
results in::
foo [foo_0.bag, foo_1.bag]
foo_3 [foo_3.bag]
foo_4 [foo_4.bag]
In this example the bag with index 2 is missing which results in
foo_3 and foo_4 to be individual sets with one bag each.
The timestamps used by ``rosbag record`` are stripped from the
name given to sets, but are kept for the remaining individual sets
in case a bag is missing::
foo_2018-01-12-14-05-12_0.bag
foo_2018-01-12-14-45-23_1.bag
foo_2018-01-12-14-55-42_3.bag
results in::
foo [foo_2018-01-12-14-05-12_0.bag,
foo_2018-01-12-14-45-23_1.bag]
foo_2018-01-12-14-45-23_1 [foo_2018-01-12-14-45-23_1.bag]
foo_2018-01-12-14-55-42_3 [foo_2018-01-12-14-55-42_3.bag]
For more information on scanners see :any:`marv_api.scanner`.
Args:
dirpath (str): The path to the directory currently being
scanned.
dirnames (str): Sorted list of subdirectories of the directory
currently being scanned. Change this in-place to control
further traversal.
filenames (str): Sorted list of files within the directory
currently being scanned.
Returns:
A list of :class:`marv_api.DatasetInfo` instances mapping
set of files to dataset names. Absolute filenames must
start with :paramref:`.dirpath`, relative filenames are
automatically prefixed with it.
See :ref:`cfg_c_scanner` config key.
"""
groups = groupby([Baginfo.parse(x) for x in reversed(filenames) if x.endswith('.bag')],
lambda x: x.prefix)
bags = []
datasets = []
for prefix, group in groups:
group = list(group)
prev_idx = None
for bag in group:
expected_idx = bag.idx if prev_idx is None else prev_idx - 1
if bag.idx != expected_idx or \
bags and (bags[0].timestamp is None) != (bag.timestamp is None):
datasets[0:0] = [DatasetInfo(x.basename, [x.filename]) for x in bags]
bags[:] = []
bags.insert(0, bag)
prev_idx = bag.idx
if bag.idx == 0:
datasets.insert(0, DatasetInfo(prefix or bag.timestamp, [x.filename for x in bags]))
bags[:] = []
elif bag.idx is None:
assert len(bags) == 1, bags
datasets.insert(0, DatasetInfo(bag.basename, [bag.filename]))
bags[:] = []
datasets[0:0] = [DatasetInfo(x.basename, [x.filename]) for x in bags]
bags[:] = []
return datasets
def read_messages(paths, topics=None, start_time=None, end_time=None):
"""Iterate chronologically raw BagMessage for topic from paths."""
bags = {path: rosbag.Bag(path) for path in paths}
gens = {
path: bag.read_messages(topics=topics, start_time=start_time, end_time=end_time, raw=True)
for path, bag in bags.items()
}
msgs = {}
prev_time = genpy.Time(0)
while True:
for key in gens.keys() - msgs.keys():
try:
msgs[key] = next(gens[key])
except StopIteration:
bags[key].close()
del bags[key]
del gens[key]
if not msgs:
break
next_key = min(msgs.items(), key=lambda x: x[1][0])[0]
next_time, next_msg = msgs.pop(next_key)
assert next_time >= prev_time, (repr(next_time), repr(prev_time))
yield next_msg
prev_time = next_time
[docs]@marv.node(Message, group='ondemand')
@marv.input('dataset', marv_nodes.dataset)
@marv.input('bagmeta', bagmeta)
def raw_messages(dataset, bagmeta): # pylint: disable=redefined-outer-name
"""Stream messages from a set of bag files."""
# pylint: disable=too-many-locals
bagmeta, dataset = yield marv.pull_all(bagmeta, dataset)
bagtopics = bagmeta.topics
connections = bagmeta.connections
paths = [x.path for x in dataset.files if x.path.endswith('.bag')]
requested = yield marv.get_requested()
alltopics = set()
bytopic = defaultdict(list)
groups = {}
for name in [x.name for x in requested if ':' in x.name]:
reqtop, reqtype = name.split(':')
# BUG: topic with more than one type is not supported
topics = [
con.topic for con in connections
if reqtop in ('*', con.topic) and reqtype in ('*', con.datatype)
]
group = groups[name] = yield marv.create_group(name)
create_stream = group.create_stream
for topic in topics:
# BUG: topic with more than one type is not supported
# pylint: disable=stop-iteration-return
con = next(x for x in connections if x.topic == topic)
# TODO: start/end_time per topic?
header = {'start_time': bagmeta.start_time,
'end_time': bagmeta.end_time,
'msg_count': con.msg_count,
'msg_type': con.datatype,
'msg_type_def': con.msg_def,
'msg_type_md5sum': con.md5sum,
'topic': topic}
stream = yield create_stream(topic, **header)
bytopic[topic].append(stream)
alltopics.update(topics)
if group:
yield group.finish()
for topic in [x.name for x in requested if ':' not in x.name]:
if topic in alltopics:
continue
# BUG: topic with more than one type is not supported
con = next((x for x in connections if x.topic == topic), None)
# TODO: start/end_time per topic?
header = {'start_time': bagmeta.start_time,
'end_time': bagmeta.end_time,
'msg_count': con.msg_count if con else 0,
'msg_type': con.datatype if con else '',
'msg_type_def': con.msg_def if con else '',
'msg_type_md5sum': con.md5sum if con else '',
'topic': topic}
stream = yield marv.create_stream(topic, **header)
if topic not in bagtopics:
yield stream.finish()
bytopic[topic].append(stream)
alltopics.add(topic)
if not alltopics:
return
# BUG: topic with more than one type is not supported
for topic, raw, timestamp in read_messages(paths, topics=list(alltopics)):
dct = {'data': raw[1], 'timestamp': timestamp.to_nsec()}
for stream in bytopic[topic]:
yield stream.msg(dct)
messages = raw_messages # pylint: disable=invalid-name
_ConnectionInfo = namedtuple('_ConnectionInfo', 'md5sum datatype msg_def')
[docs]def get_message_type(stream):
"""ROS message type from definition stored for stream."""
if stream.msg_type and stream.msg_type_def and stream.msg_type_md5sum:
info = _ConnectionInfo(md5sum=stream.msg_type_md5sum,
datatype=stream.msg_type,
msg_def=stream.msg_type_def)
return _get_message_type(info)
return None