# -*- coding: utf-8 -*-
from __future__ import division
import itertools
import logging
import matplotlib.animation as animation
import matplotlib.pyplot as plt
import numpy as np
import six
from six.moves import queue
from tqdm import tqdm
from .config import CONFIG
from .octrees import OctreeVolume
from .postprocessing import Body
from .util import (
get_color_shader,
pad_dims,
WrappedViewer,
)
[docs]class Region(object):
"""A region (single seeded body) for flood filling.
This object contains the necessary data to perform flood filling for a
single body.
Parameters
----------
image : ndarray or diluvian.octrees.OctreeVolume
Raw image data. If it is an octree, it is assumed all volumetric
storage should also operate block-sparsely and there is no ground
truth available.
target : ndarray, optional
Target mask probabilities (ground truth converted to network targets).
seed_vox : ndarray, optional
Coordinates of the seed voxel.
mask : ndarray, optional
Object prediction mask for output. Provided as an argument here in
case resuming or extending an existing result.
sparse_mask : bool, optional
If true, force the predicted mask to be a block-sparse array instead
of a dense ndarray. Note that if ``image`` is a
``diluvian.octrees.OctreeVolume``, the mask will be sparse regardless
of this parameter.
block_padding : str, optional
Method to use to pad data when the network's input field of view
extends outside the region bounds. This is passed to ``numpy.pad``.
Defaults to ``None``, which indicates attempts to operate outside the
region bounds are erroneous.
Attributes
----------
bias_against_merge : bool
Whether to bias against merge by never overwriting mask probabilities
less than 0.5 once they have been written.
move_based_on_new_mask : bool
Whether to generate moves based on the probabilities only in the newly
predicted mask block (if true), or on the mask block once combined with
the existing probability mask (if false).
move_check_thickness : int
Thickness in voxels to check around the move plane in each direction
when determining which moves to queue. See ``get_moves`` method.
"""
@staticmethod
[docs] def from_subvolume(subvolume, **kwargs):
if subvolume.label_mask is not None and np.issubdtype(subvolume.label_mask.dtype, np.bool):
target = mask_to_output_target(subvolume.label_mask)
else:
target = subvolume.label_mask
return Region(subvolume.image,
target=target,
seed_vox=subvolume.seed,
**kwargs)
@staticmethod
[docs] def from_subvolume_generator(subvolumes, **kwargs):
subvolumes = itertools.ifilter(lambda s: s.has_uniform_seed_margin(), subvolumes)
return itertools.imap(lambda v: Region.from_subvolume(v, **kwargs), subvolumes)
def __init__(self, image, target=None, seed_vox=None, mask=None, sparse_mask=False, block_padding=None):
self.block_padding = block_padding
self.MOVE_DELTA = CONFIG.model.move_step
self.queue = queue.PriorityQueue()
self.visited = set()
self.image = image
self.bounds = np.array(image.shape, dtype=np.int64)
if seed_vox is None:
self.MOVE_GRID_OFFSET = np.array([0, 0, 0], dtype=np.int64)
else:
self.MOVE_GRID_OFFSET = np.mod(seed_vox, self.MOVE_DELTA).astype(np.int64)
self.move_bounds = (
np.ceil(np.true_divide((CONFIG.model.input_fov_shape - 1) // 2 - self.MOVE_GRID_OFFSET,
self.MOVE_DELTA)).astype(np.int64),
self.vox_to_pos(np.array(self.bounds) - 1 - (CONFIG.model.input_fov_shape - 1) // 2),
)
self.move_check_thickness = CONFIG.model.move_check_thickness
if mask is None:
if isinstance(self.image, OctreeVolume):
self.mask = OctreeVolume(self.image.leaf_shape, (np.zeros(3), self.bounds), 'float32')
self.mask[:] = np.NAN
elif sparse_mask:
self.mask = OctreeVolume(CONFIG.model.training_subv_shape, (np.zeros(3), self.bounds), 'float32')
self.mask[:] = np.NAN
else:
self.mask = np.full(self.bounds, np.NAN, dtype=np.float32)
else:
self.mask = mask
self.target = target
self.bias_against_merge = False
self.move_based_on_new_mask = False
self.prioritize_proximity = CONFIG.model.move_priority == 'proximity'
self.proximity = {}
if seed_vox is None:
seed_pos = np.floor_divide(self.move_bounds[0] + self.move_bounds[1], 2)
else:
seed_pos = self.vox_to_pos(seed_vox)
assert self.pos_in_bounds(seed_pos), \
'Seed position (%s) must be in region move bounds (%s, %s).' % \
(seed_vox, self.move_bounds[0], self.move_bounds[1])
self.seed_pos = seed_pos
self.queue.put((None, seed_pos))
self.proximity[tuple(seed_pos)] = 1
self.seed_vox = self.pos_to_vox(seed_pos)
if self.target is not None:
self.target_offset = (self.bounds - self.target.shape) // 2
assert np.isclose(self.target[tuple(self.seed_vox - self.target_offset)], CONFIG.model.v_true), \
'Seed position should be in target body.'
self.mask[tuple(self.seed_vox)] = CONFIG.model.v_true
self.visited.add(tuple(self.seed_pos))
[docs] def unfilled_copy(self):
"""Clone this region in an initial state without any filling.
Returns
-------
Region
"""
copy = Region(self.image, target=self.target, seed_vox=self.pos_to_vox(self.seed_pos))
copy.bias_against_merge = self.bias_against_merge
copy.move_based_on_new_mask = self.move_based_on_new_mask
return copy
[docs] def to_body(self):
def threshold(a):
return a >= CONFIG.model.t_final
if isinstance(self.mask, OctreeVolume):
hard_mask = self.mask.map_copy(np.bool, threshold, threshold)
else:
hard_mask = threshold(self.mask)
return Body(hard_mask, self.pos_to_vox(self.seed_pos))
[docs] def vox_to_pos(self, vox):
return np.floor_divide(vox - self.MOVE_GRID_OFFSET, self.MOVE_DELTA).astype(np.int64)
[docs] def pos_to_vox(self, pos):
return (pos * self.MOVE_DELTA).astype(np.int64) + self.MOVE_GRID_OFFSET
[docs] def pos_in_bounds(self, pos):
if self.block_padding is None:
return np.all(np.greater_equal(pos, self.move_bounds[0])) and \
np.all(np.less_equal(pos, self.move_bounds[1]))
else:
return np.all(np.less(self.pos_to_vox(pos), self.bounds)) and np.all(pos >= 0)
[docs] def get_block_bounds(self, vox, shape, offset=None):
"""Get the bounds of a block by center and shape, accounting padding.
Returns the voxel bounds of a block specified by shape and center in
the region, clamping the bounds to be in the volume but returning
padding margins that extend outside the region bounds.
Parameters
----------
vox : ndarray
Center of the block in voxel coordinates.
shape : ndarray
Shape of the block.
offset : ndarray, optional
If provided, offset of coordinates from the volume where these
bounds where be used. This is needed if the volume has a margin
(i.e., is smaller than the main region volume), such as the target
volume for contracted output shapes.
Returns
-------
block_min, block_max : ndarray
Extents of the block in voxel coordinates clamped to the region
bounds.
padding_pre, padding_post : ndarray
How much the block extends outside the region bounds.
"""
if offset is None:
offset = np.zeros(3, dtype=vox.dtype)
margin = (shape - 1) // 2
block_min = vox - margin
block_max = vox + margin + 1
padding_pre = np.maximum(0, -block_min)
padding_post = np.maximum(0, block_max - self.bounds + offset + offset)
block_min = np.maximum(0, block_min)
block_max = np.minimum(block_max, self.bounds - offset - offset)
return block_min, block_max, padding_pre, padding_post
[docs] def get_moves(self, mask):
"""Given a mask block, get maximum probability in each move direction.
Checks each of six planes comprising a centered cube half the shape
of the provided block. For each of these planes, the maximum
probability in the mask block is returned along with the move
direction.
Unlike the original implementation, this will check an n-voxel thick
slab of voxels around each pane specified by this region's
``move_check_thickness`` property. This is useful for overcoming
artifacts that may only affect a single plane that happens to align
with the move grid.
Parameters
----------
mask : ndarray
Block of mask probabilities, usually of the shape specified by
the configured ``output_fov_shape``.
Returns
-------
list of dict
Each dict should include a ``move`` ndarray unit vector indicating
the move direction and a ``v`` indicating the max probability
in the move plane in that direction.
"""
moves = []
ctr = np.asarray(mask.shape) // 2
for move in map(np.array, [(1, 0, 0), (-1, 0, 0),
(0, 1, 0), (0, -1, 0),
(0, 0, 1), (0, 0, -1)]):
plane_min = ctr - (-2 * np.maximum(move, 0) + 1) * self.MOVE_DELTA \
- np.abs(move) * (self.move_check_thickness - 1)
plane_max = ctr + (+2 * np.minimum(move, 0) + 1) * self.MOVE_DELTA \
+ np.abs(move) * (self.move_check_thickness - 1) + 1
moves.append({'move': move,
'v': mask[plane_min[0]:plane_max[0],
plane_min[1]:plane_max[1],
plane_min[2]:plane_max[2]].max()})
return moves
[docs] def check_move_neighborhood(self, mask):
"""Given a mask block, check if any central voxels meet move threshold.
Checks whether a cube one move in each direction from the mask center
contains any probabilities greater than the move threshold.
Parameters
----------
mask : ndarray
Block of mask probabilities, usually of the shape specified by
the configured ``output_fov_shape``.
Returns
-------
bool
"""
ctr = np.asarray(mask.shape) // 2
neigh_min = ctr - self.MOVE_DELTA
neigh_max = ctr + self.MOVE_DELTA + 1
neighborhood = mask[map(slice, neigh_min, neigh_max)]
return np.nanmax(neighborhood) >= CONFIG.model.t_move
[docs] def add_mask(self, mask_block, mask_pos):
mask_vox = self.pos_to_vox(mask_pos)
mask_min, mask_max, pad_pre, pad_post = self.get_block_bounds(mask_vox, np.asarray(mask_block.shape))
if np.any(pad_pre) or np.any(pad_post):
assert self.block_padding is not None, \
'Position block extends out of region bounds, but padding is not enabled: {}'.format(mask_pos)
end = [-x if x != 0 else None for x in pad_post]
mask_block = mask_block[list(map(slice, pad_pre, end))]
current_mask = self.mask[mask_min[0]:mask_max[0],
mask_min[1]:mask_max[1],
mask_min[2]:mask_max[2]]
if self.bias_against_merge:
update_mask = np.isnan(current_mask) | (current_mask > 0.5) | np.less(mask_block, current_mask)
current_mask[update_mask] = mask_block[update_mask]
else:
current_mask[:] = mask_block
self.mask[mask_min[0]:mask_max[0],
mask_min[1]:mask_max[1],
mask_min[2]:mask_max[2]] = current_mask
if self.move_based_on_new_mask:
move_check_block = mask_block
else:
move_check_block = current_mask
pad_width = list(zip(list(pad_pre), list(pad_post)))
move_check_block = np.pad(move_check_block, pad_width, 'constant')
new_moves = self.get_moves(move_check_block)
if self.prioritize_proximity:
proximity = self.proximity[tuple(mask_pos)] + 1
del self.proximity[tuple(mask_pos)]
else:
proximity = None
for move in new_moves:
new_pos = mask_pos + move['move']
if not self.pos_in_bounds(new_pos):
continue
if tuple(new_pos) not in self.visited and move['v'] >= CONFIG.model.t_move:
self.visited.add(tuple(new_pos))
priority = self.get_move_priority(new_pos, move['v'], proximity)
self.queue.put((priority, tuple(new_pos)))
[docs] def get_move_priority(self, pos, value, proximity=None):
if CONFIG.model.move_priority == 'proximity':
priority = -value
self.proximity[tuple(pos)] = min(self.proximity.get(tuple(pos), proximity), proximity)
priority /= proximity
elif CONFIG.model.move_priority == 'random':
priority = np.random.rand()
else: # descending
priority = -value
return priority
[docs] def get_next_block(self):
mask_block = None
while mask_block is None:
try:
queued_move = self.queue.get_nowait()
except queue.Empty:
return None
next_pos = np.asarray(queued_move[1])
next_vox = self.pos_to_vox(next_pos)
block_min, block_max, pad_pre, pad_post = self.get_block_bounds(next_vox, CONFIG.model.input_fov_shape)
assert self.block_padding is not None or not (np.any(pad_pre) or np.any(pad_post)), \
'Position block extends out of region bounds, but padding is not enabled: {}'.format(next_pos)
mask_block = self.mask[block_min[0]:block_max[0],
block_min[1]:block_max[1],
block_min[2]:block_max[2]].copy()
mask_block[np.isnan(mask_block)] = CONFIG.model.v_false
# Check that there is still some t_move threshold mask near the move.
if CONFIG.model.move_recheck and not (
np.array_equal(next_pos, self.seed_pos) or self.check_move_neighborhood(mask_block)):
logging.debug('Skipping move: no threshold mask in cube around voxel %s', np.array_str(next_vox))
# Remove from the visited set: move was not taken, but later
# moves could queue it.
self.visited.remove(tuple(next_pos))
mask_block = None
image_block = self.image[block_min[0]:block_max[0],
block_min[1]:block_max[1],
block_min[2]:block_max[2]]
if np.any(pad_pre) or np.any(pad_post):
assert self.block_padding is not None, \
'Position block extends out of region bounds, but padding is not enabled: {}'.format(next_pos)
pad_width = zip(list(pad_pre), list(pad_post))
image_block = np.pad(image_block, pad_width, self.block_padding)
mask_block = np.pad(mask_block, pad_width, self.block_padding)
if self.target is not None:
block_min, block_max, pad_pre, pad_post = self.get_block_bounds(
next_vox - self.target_offset, CONFIG.model.output_fov_shape, self.target_offset)
target_block = self.target[block_min[0]:block_max[0],
block_min[1]:block_max[1],
block_min[2]:block_max[2]]
if np.any(pad_pre) or np.any(pad_post):
pad_width = zip(list(pad_pre), list(pad_post))
target_block = np.pad(target_block, pad_width, self.block_padding)
else:
target_block = None
assert image_block.shape == tuple(CONFIG.model.input_fov_shape), \
'Image wrong shape: {}'.format(image_block.shape)
assert mask_block.shape == tuple(CONFIG.model.input_fov_shape), \
'Mask wrong shape: {}'.format(mask_block.shape)
return {'image': image_block,
'mask': mask_block,
'target': target_block,
'position': next_pos}
[docs] def prediction_metric(self, metric, threshold=True, **kwargs):
pred_bounds = [None, None]
pred_bounds[0] = self.get_block_bounds(self.pos_to_vox(self.move_bounds[0]), CONFIG.model.output_fov_shape)[0]
pred_bounds[1] = self.get_block_bounds(self.pos_to_vox(self.move_bounds[1]), CONFIG.model.output_fov_shape)[1]
pred = self.mask[list(map(slice, pred_bounds[0], pred_bounds[1]))].copy()
pred[np.isnan(pred)] = CONFIG.model.v_false
targ_bounds = [None, None]
targ_bounds[0] = self.get_block_bounds(self.pos_to_vox(self.move_bounds[0]) - self.target_offset,
CONFIG.model.output_fov_shape,
self.target_offset)[0]
targ_bounds[1] = self.get_block_bounds(self.pos_to_vox(self.move_bounds[1]) - self.target_offset,
CONFIG.model.output_fov_shape,
self.target_offset)[1]
target = self.target[list(map(slice, targ_bounds[0], targ_bounds[1]))]
if threshold:
target = target >= CONFIG.model.t_final
pred = pred >= CONFIG.model.t_final
return metric(target, pred, **kwargs)
[docs] def remask(self):
"""Reset the mask based on the seeded connected component.
"""
body = self.to_body()
if not body.is_seed_in_mask():
return False
new_mask_bin, bounds = body.get_seeded_component(CONFIG.postprocessing.closing_shape)
new_mask_bin = new_mask_bin.astype(np.bool)
mask_block = self.mask[list(map(slice, bounds[0], bounds[1]))].copy()
# Clip any values not in the seeded connected component so that they
# cannot not generate moves when rechecking.
mask_block[~new_mask_bin] = np.clip(mask_block[~new_mask_bin], None, 0.9 * CONFIG.model.t_move)
self.mask[:] = np.NAN
self.mask[list(map(slice, bounds[0], bounds[1]))] = mask_block
return True
[docs] class EarlyFillTermination(Exception):
pass
[docs] def fill(self, model, progress=False, move_batch_size=1, max_moves=None, stopping_callback=None,
remask_interval=None, generator=False):
"""Flood fill this region.
Note this returns a generator, so must be iterated to start filling.
Parameters
----------
model : keras.models.Model
Model to use for object prediction.
progress : bool or int, optional
Whether to display a progress bar. If an int, indicates the
progress bar is nested and should appear at that level.
move_batch_size : int, optional
Number of moves to process in parallel. Note that in the algorithm
as originally described this is 1, because otherwise moves'
outputs may affect each other or the queue. Setting this higher
can increase throughput.
max_moves : int, optional
Terminate filling after this many moves even if the queue is not
empty.
stopping_callback : function, optional
Function periodical called that will terminate filling if it returns
true.
remask_interval : int, optional
Frequency in moves to reset the mask to be based on the
morphological seed connected component. This is useful to discourage
long-running fills due to runaway merging. Only sensible when
using move rechecking, proximity priority, and rejecting non-seeded
connected components.
generator : bool
If true, each tuple of batch inputs and outputs will be yielded.
Yields
------
tuple
Batch inputs and outputs if ``generator`` is true.
Raises
------
Region.EarlyFillTermination
If filling was terminated early due to either exceeding the
maximum number of moves or the stopping callback.
"""
moves = 0
last_check = 0
last_remask = 0
if remask_interval is None:
remask_interval = float('inf')
STOP_CHECK_INTERVAL = 100
early_termination = False
if progress:
pbar = tqdm(desc='Move queue', position=progress)
while not self.queue.empty():
batch_block_data = [self.get_next_block() for _ in
itertools.takewhile(lambda _: not self.queue.empty(), range(move_batch_size))]
batch_block_data = [b for b in batch_block_data if b is not None]
batch_moves = len(batch_block_data)
if batch_moves == 0:
break
moves += batch_moves
if progress:
pbar.total = moves + self.queue.qsize()
pbar.set_description(str(self.seed_vox) + ' Move ' + str(batch_block_data[-1]['position']))
pbar.update(batch_moves)
if stopping_callback is not None and moves - last_check >= STOP_CHECK_INTERVAL:
last_check = moves
if stopping_callback(self):
early_termination = True
break
image_input = np.concatenate([pad_dims(b['image']) for b in batch_block_data])
mask_input = np.concatenate([pad_dims(b['mask']) for b in batch_block_data])
output = model.predict_on_batch({'image_input': image_input,
'mask_input': mask_input})
for ind, block_data in enumerate(batch_block_data):
self.add_mask(output[ind, :, :, :, 0], block_data['position'])
if generator:
yield (batch_block_data, output)
if max_moves is not None and moves > max_moves:
early_termination = True
break
if moves - last_remask >= remask_interval:
if not self.remask():
early_termination = True
break
last_remask = moves
if progress:
pbar.close()
if early_termination:
raise Region.EarlyFillTermination()
return
[docs] def fill_animation(self, movie_filename, *args, **kwargs):
"""Create an animated movie of the filling process for this region.
Parameters
----------
movie_filename : str
File name of the MP4 movie to be saved.
*args, **kwargs
Passed to ``fill``.
"""
dpi = 100
fig = plt.figure(figsize=(1920/dpi, 1080/dpi), dpi=dpi)
fig.patch.set_facecolor('black')
axes = {
'xy': fig.add_subplot(1, 3, 1),
'xz': fig.add_subplot(1, 3, 2),
'zy': fig.add_subplot(1, 3, 3),
}
planes = {'xy': 0, 'xz': 1, 'zy': 2}
def get_plane(arr, vox, plane):
return {
'xy': lambda a, v: a[v[0], :, :],
'xz': lambda a, v: a[:, v[1], :],
'zy': lambda a, v: np.transpose(a[:, :, v[2]]),
}[plane](arr, np.round(vox).astype(np.int64))
def get_hv(vox, plane):
# rel = np.divide(vox, self.bounds)
rel = vox
# rel = self.bounds - vox
return {
'xy': {'h': rel[1], 'v': rel[2]},
'xz': {'h': rel[0], 'v': rel[2]},
'zy': {'h': rel[1], 'v': rel[0]},
}[plane]
def get_aspect(plane):
return {
'xy': CONFIG.volume.resolution[1] / CONFIG.volume.resolution[2],
'xz': CONFIG.volume.resolution[0] / CONFIG.volume.resolution[2],
'zy': CONFIG.volume.resolution[1] / CONFIG.volume.resolution[0],
}[plane]
images = {
'last': None,
'image': {},
'mask': {},
}
lines = {
'v': {},
'h': {},
'bl': {},
'bt': {},
}
current_vox = self.pos_to_vox(self.seed_pos)
margin = CONFIG.model.input_fov_shape // 2
for plane, ax in six.iteritems(axes):
ax.get_xaxis().set_visible(False)
ax.get_yaxis().set_visible(False)
image_data = get_plane(self.image, current_vox, plane)
im = ax.imshow(image_data, cmap='gray')
im.set_clim([0, 1])
images['image'][plane] = im
mask_data = get_plane(self.mask, current_vox, plane)
im = ax.imshow(mask_data, cmap='jet', alpha=0.8)
im.set_clim([0, 1])
images['mask'][plane] = im
aspect = get_aspect(plane)
lines['h'][plane] = ax.axhline(y=get_hv(current_vox - margin, plane)['h'], color='w')
lines['v'][plane] = ax.axvline(x=get_hv(current_vox + margin, plane)['v'], color='w')
lines['bl'][plane] = ax.axvline(x=get_hv(current_vox - margin, plane)['v'], color='w')
lines['bt'][plane] = ax.axhline(y=get_hv(current_vox + margin, plane)['h'], color='w')
ax.set_aspect(aspect)
images['last'] = np.round(current_vox).astype(np.int64)
plt.tight_layout()
fill_generator = self.fill(*args, generator=True, **kwargs)
def update_fn(vox):
mask_changed = False
if np.array_equal(np.round(vox).astype(np.int64), update_fn.next_pos_vox):
try:
batch_block_data, output = six.next(fill_generator)
block_data = batch_block_data[0]
mask_changed = True
except (StopIteration, Region.EarlyFillTermination):
block_data = None
if block_data is not None:
update_fn.next_pos_vox = self.pos_to_vox(block_data['position'])
if not np.array_equal(np.round(vox).astype(np.int64), update_fn.next_pos_vox):
p = update_fn.next_pos_vox - vox
steps = np.linspace(0, 1, 16)
interp_vox = vox + np.outer(steps, p)
for row in interp_vox:
update_fn.vox_queue.put(row)
else:
update_fn.vox_queue.put(vox)
vox_round = np.round(vox).astype(np.int64)
changed_images = []
for plane, im in six.iteritems(images['image']):
if vox_round[planes[plane]] != images['last'][planes[plane]]:
image_data = get_plane(self.image, vox, plane)
im.set_data(image_data)
changed_images.append(im)
for plane, im in six.iteritems(images['mask']):
if mask_changed or vox_round[planes[plane]] != images['last'][planes[plane]]:
image_data = get_plane(self.mask, vox, plane)
masked_data = np.ma.masked_where(image_data < 0.5, image_data)
im.set_data(masked_data)
changed_images.append(im)
images['last'] = vox_round
for plane in axes.iterkeys():
lines['h'][plane].set_ydata(get_hv(vox - margin, plane)['h'])
lines['v'][plane].set_xdata(get_hv(vox + margin, plane)['v'])
lines['bl'][plane].set_xdata(get_hv(vox - margin, plane)['v'])
lines['bt'][plane].set_ydata(get_hv(vox + margin, plane)['h'])
return changed_images + \
lines['h'].values() + lines['v'].values() + \
lines['bl'].values() + lines['bt'].values()
update_fn.moves = 0
update_fn.next_pos_vox = current_vox
update_fn.vox_queue = queue.Queue()
update_fn.vox_queue.put(current_vox)
def vox_gen():
last_vox = None
while 1:
if update_fn.vox_queue.empty():
return
else:
last_vox = update_fn.vox_queue.get()
yield last_vox
ani = animation.FuncAnimation(fig, update_fn, frames=vox_gen(), interval=16, repeat=False, save_count=60*60)
writer = animation.writers['ffmpeg'](fps=60)
ani.save(movie_filename, writer=writer, dpi=dpi, savefig_kwargs={'facecolor': 'black'})
return ani
[docs] def get_viewer(self, transpose=False):
if transpose:
viewer = WrappedViewer(voxel_size=list(CONFIG.volume.resolution),
voxel_coordinates=self.pos_to_vox(self.seed_pos))
viewer.add(np.transpose(self.image),
name='Image')
if self.target is not None:
viewer.add(np.transpose(self.target),
name='Mask Target',
shader=get_color_shader(0))
viewer.add(np.transpose(self.mask),
name='Mask Output',
shader=get_color_shader(1))
else:
viewer = WrappedViewer(voxel_size=list(np.flipud(CONFIG.volume.resolution)),
voxel_coordinates=np.flipud(self.pos_to_vox(self.seed_pos)))
viewer.add(self.image,
name='Image')
if self.target is not None:
viewer.add(self.target,
name='Mask Target',
shader=get_color_shader(0))
viewer.add(self.mask,
name='Mask Output',
shader=get_color_shader(1))
return viewer
[docs] def render_body(self):
from mayavi import mlab
body = self.to_body()
mask, bounds = body.get_seeded_component(CONFIG.postprocessing.closing_shape)
fig = mlab.figure(size=(1280, 720))
if self.target is not None:
target_grid = mlab.pipeline.scalar_field(self.target)
target_grid.spacing = CONFIG.volume.resolution
target_grid = mlab.pipeline.iso_surface(target_grid, contours=[0.5], color=(1, 0, 0), opacity=0.1)
grid = mlab.pipeline.scalar_field(mask)
grid.spacing = CONFIG.volume.resolution
mlab.pipeline.iso_surface(grid, color=(0, 1, 0), contours=[0.5], opacity=0.6)
mlab.orientation_axes(figure=fig, xlabel='Z', zlabel='X')
mlab.view(azimuth=45, elevation=30, focalpoint='auto', roll=90, figure=fig)
mlab.show()
[docs] def fill_render(self, model, save_movie=True, **kwargs):
from mayavi import mlab
body = self.to_body()
mask = body.mask
fig = mlab.figure(size=(1280, 720))
if self.target is not None:
target_grid = mlab.pipeline.scalar_field(np.transpose(self.target))
target_grid.spacing = np.flipud(CONFIG.volume.resolution)
target_grid = mlab.pipeline.iso_surface(target_grid, contours=[0.5], color=(1, 0, 0), opacity=0.1)
grid = mlab.pipeline.scalar_field(np.transpose(mask.astype(np.int32)))
grid.spacing = np.flipud(CONFIG.volume.resolution)
contour = mlab.pipeline.iso_surface(grid, color=(0, 1, 0), contours=[0.5], opacity=0.6)
contour.actor.property.backface_culling = True
grid = contour.mlab_source
mlab.orientation_axes(figure=fig)
mlab.view(azimuth=45, elevation=60, focalpoint='auto', figure=fig)
fill_generator = self.fill(model, generator=True, **kwargs)
FRAMES_PER_MOVE = 2
FPS = 60.0
ORBIT_RATE = 0.125
@mlab.animate(delay=int(1000.0/FPS), ui=True)
def animate():
try:
for _, _ in fill_generator:
body = self.to_body()
mask = body.mask
grid.set(scalars=np.transpose(mask.astype(np.int32)))
for _ in range(FRAMES_PER_MOVE):
view = list(mlab.view(figure=fig))
view[0] = (view[0] + ORBIT_RATE * 360.0 / FPS) % 360.0
mlab.view(azimuth=view[0], elevation=view[1], focalpoint='auto')
fig.scene.render()
# fig.scene.movie_maker.animation_step()
yield
except Region.EarlyFillTermination:
pass
fig.scene.movie_maker.record = False
fig.scene.movie_maker.animation_stop()
if save_movie:
fig.scene.movie_maker.record = True
a = animate() # noqa
mlab.show()
[docs]def mask_to_output_target(mask):
target = np.full_like(mask, CONFIG.model.v_false, dtype=np.float32)
target[mask] = CONFIG.model.v_true
return target