#!/usr/bin/env python2.1 """ An annotation tool for the MACEARS MDE annotation task Copyright (C) 2002-3 Linguistic Data Consortium, U Penn. Web: http://www.ldc.upenn.edu/; Email: ldc@ldc.upenn.edu For license information, see the file `LICENSE' included with the distribution. Author: Kazuaki Maeda """ PREPROCESS = 0 import sys import ag import string import agtk.agUtils import os import os.path import getopt import tempfile import agtk import webbrowser import mdeText import mdeAPI #import MultiListbox import re from Tkinter import * from agtk.agWsurf import * from tkFileDialog import * try: import Pmw except ImportError: Pmw = None print "This tool requires Pmw" sys.exit() AGAPPS_VERSION = '1.00' DEBUG = 0 #try: # agtkPath = agtk.__path__[0] #except: agtkPath = os.getcwd() def processCmdlineOpts(cmdOpts): """ Process command line options; return a hash that can be passed to the application. """ opts = {} useConf = None useSoundFile = None useAnnotationFile = None useType = None for i in cmdOpts: if re.match('--help', i): printUsage() sys.exit() elif re.match('--rev', i): opts['reverseChannels'] = 1 elif re.match('--conf', i): useConf = 1 elif re.match('--type', i): useType = 1 elif re.match('--speech', i): useSoundFile = 1 elif re.match('--sound', i): useSoundFile = 1 elif re.match('--ann', i): useAnnotationFile = 1 elif re.match('-', i): printUsage() sys.exit() elif useConf: opts['configFileName'] = i useConf = None elif useSoundFile: opts['soundFileName'] = i useSoundFile = None elif useType: opts['corpusType'] = i useType = None elif useAnnotationFile: opts['annotationFileName'] = i useAnnotationFile = None return opts def printUsage(): print "Usage:\n", sys.argv[0], \ """\\ --help: print this message --type corpusType --rev (reverse the channels) --sound soundFile (or --speech soundFile) --anno(tation) annotationFile""" def getNearestOffsetBefore(anc): try: offset = float(ag.GetAnchorOffset(anc)) except: offset = 0.0 if offset <= 0.0: annSet = ag.GetIncomingAnnotationSet(anc).split() try: s = ag.GetStartAnchor(annSet[0]) return getNearestOffsetBefore(s) except: return offset else: return offset def getNearestOffsetAfter(anc): try: offset = float(ag.GetAnchorOffset(anc)) except: offset = 0.0 if offset <= 0.0: annSet = ag.GetOutgoingAnnotationSet(anc).split() try: e = ag.GetEndAnchor(annSet[0]) return getNearestOffsetAfter(e) except: return offset else: return offset def getNearestOffsetByTypeBefore2(anc, type): annSet = ag.GetOutgoingAnnotationSet(anc).split() for a in annSet: if ag.GetAnnotationType(a) == type: return ag.GetAnchorOffset(anc) annSet = ag.GetIncomingAnnotationSet(anc).split() for a in annSet: anc1 = ag.GetStartAnchor(a) return getNearestOffsetByTypeBefore(anc1, type) return ag.GetAnchorOffset(anc) def getNearestOffsetByTypeAfter2(anc, type): annSet = ag.GetIncomingAnnotationSet(anc).split() for a in annSet: if ag.GetAnnotationType(a) == type: return ag.GetAnchorOffset(anc) annSet = ag.GetOutgoingAnnotationSet(anc).split() for a in annSet: anc1 = ag.GetEndAnchor(a) return getNearestOffsetByTypeAfter(anc1, type) return ag.GetAnchorOffset(anc) def getNearestOffsetByTypeBefore(anc, type): start = ag.GetAnchorOffset(anc) agId = ag.GetAGId(anc) for ann in ag.GetAnnotationSetByOffset(agId, start).split(): if ag.GetAnnotationType(ann) == type: return ag.GetStartOffset(ann) def getNearestOffsetByTypeAfter(anc, type): end = ag.GetAnchorOffset(anc) agId = ag.GetAGId(anc) for ann in ag.GetAnnotationSetByOffset(agId, end).split(): if ag.GetAnnotationType(ann) == type: return ag.GetEndOffset(ann) def getNearestOffsetByTypeBoth(anc, type): end = ag.GetAnchorOffset(anc) agId = ag.GetAGId(anc) for ann in ag.GetAnnotationSetByOffset(agId, end).split(): if ag.GetAnnotationType(ann) == type: return (ag.GetStartOffset(ann), ag.GetEndOffset(ann)) class MyMultiListbox(Frame): """ This class is a modified version of Python Cookbook Recipe 52266 """ def __init__(self, master, lists, annList, command): Frame.__init__(self, master) self.lists = [] self.annList = annList self.command = command for l,w in lists: frame = Frame(self); frame.pack(side=LEFT, expand=YES, fill=BOTH) Label(frame, text=l, borderwidth=1, relief=RAISED).pack(fill=X) lb = Listbox(frame, width=w, borderwidth=0, selectborderwidth=0, relief=FLAT, exportselection=FALSE) lb.pack(expand=YES, fill=BOTH) self.lists.append(lb) lb.bind('', lambda e, s=self: s._select(e.y)) lb.bind('', lambda e, s=self: s._selectB1(e.y)) lb.bind('', lambda e: 'break') lb.bind('', lambda e, s=self: s._b2motion(e.x, e.y)) lb.bind('', lambda e, s=self: s._button2(e.x, e.y)) frame = Frame(self); frame.pack(side=LEFT, fill=Y) Label(frame, borderwidth=1, relief=RAISED).pack(fill=X) sb = Scrollbar(frame, orient=VERTICAL, command=self._scroll) sb.pack(expand=YES, fill=Y) self.lists[0]['yscrollcommand']=sb.set def _select(self, y): row = self.lists[0].nearest(y) self.selection_clear(0, END) self.selection_set(row) return 'break' def _selectB1(self, y): row = self.lists[0].nearest(y) self.selection_clear(0, END) self.selection_set(row) try: ann = self.annList[row] except: ann = None self.command(ann) return 'break' def _button2(self, x, y): for l in self.lists: l.scan_mark(x, y) return 'break' def _b2motion(self, x, y): for l in self.lists: l.scan_dragto(x, y) return 'break' def _scroll(self, *args): for l in self.lists: apply(l.yview, args) def curselection(self): return self.lists[0].curselection() def delete(self, first, last=None): for l in self.lists: l.delete(first, last) def get(self, first, last=None): result = [] for l in self.lists: result.append(l.get(first,last)) if last: return apply(map, [None] + result) return result def index(self, index): self.lists[0].index(index) def insert(self, index, *elements): for e in elements: i = 0 for l in self.lists: l.insert(index, e[i]) i = i + 1 def size(self): return self.lists[0].size() def see(self, index): for l in self.lists: l.see(index) def selection_anchor(self, index): for l in self.lists: l.selection_anchor(index) def selection_clear(self, first, last=None): for l in self.lists: l.selection_clear(first, last) def selection_includes(self, index): return self.lists[0].selection_includes(index) def selection_set(self, first, last=None): for l in self.lists: l.selection_set(first, last) class mdeMain: """ This tool requires Wsurf (WaveSurfer), Pmw and AGLIB 1.0 'master' is the master Tk object. 'opts' is a dictionary containing information for any of the following keys: soundFileName, configFileName annotationFileName """ def __init__(self, master, opts={}): """ self._fileFormat self._currentStartPosition self._currentEndPosition self._currentAnnotation self._currentAnnotationFileName self._modified self.AGSetId self.timelineId self.AGIds And the following are used for GUI components: self.frame self.trans self.wsurf self.toolbar self.mbar """ # cts, bn or mtg self.corpusType = 'cts' try: tkPath = os.environ["AGTK_TK_DIR"] tkPathList = tkPath.split(':') tkPathList.reverse() for e in tkPathList: com='set auto_path [concat '+e+' $auto_path]' master.tk.eval(com) except: pass try: tkPath = os.environ["AGTK_TK_PATH"] tkPathList = tkPath.split(':') tkPathList.reverse() for e in tkPathList: com='set auto_path [concat '+e+' $auto_path]' master.tk.eval(com) except: pass try: master.tk.eval('package require wsurf 1.0') except TclError: print "This tool requires Wavesurfer 1.0." sys.exit() Pmw.initialise(fontScheme='pmw1') Pmw.aboutversion(0.1) Pmw.aboutcopyright('Copyright Linguistic Data Consortium 2003\n') Pmw.aboutcontact( 'For information about this application, see:\n'+ 'http://agtk.sourceforge.net/') self.master = master self.frame = Frame(master) self.frame.top = Frame(master) self.frame.bottom = Frame(master) self.frame.top.pack(expand=1, fill=BOTH) self.frame.bottom.pack(expand=1, fill=BOTH) self.frame.pack(expand=1, fill=BOTH) if opts.has_key('corpusType'): self.corpusType = opts['corpusType'] if self.corpusType == 'cts': self.numChannels = 2 elif self.corpusType == 'bn': self.numChannels = 1 self.trans = mdeText.mdeText(self.frame.top, eventProc=self.transEvent, numChannels=self.numChannels) self.trans.pack(expand=1, fill=BOTH) # self.mdeAPI = mdeAPI.mdeAG() # create a toolbar self.toolbar = agtk.agUtils.agToolbar(self.frame.top) self.toolbar.pack(expand=1, fill=X) ### create a wsurf (WaveSurfer) widget # 'Initialize' sets Info(Pref,linkFile) to 0 master.tk.call('::wsurf::Initialize') # set Info(Pref,linkFile) to 1 to use 'shape' files master.tk.call('set', '::wsurf::Info(Prefs,linkFile)', 1) self.wsurf = agWsurf(self.frame.bottom, collapser=0, icons='{stop}', state='expanded', eventProc = self.signalEvent) self.wsurf.pack(expand=1, fill=BOTH) # create main->client communication components self.trans.ac = agUtils.agClient(self.trans.newevent) self.wsurf.ac = agUtils.agClient(self.wsurf.newevent) # set instance variables to default values self._currentStartPosition = 0.00 self._currentEndPosition = 0.00 self._currentAnnotation = '' self._currentAnnotationFileName = StringVar() self.uninitAG() self._modified = None self._fileOpen = None self.token2segment = {} self.token2w = {} self.agId2speaker = {} #self.wsurf.bind_class('.', '', self.bindPlaySoloToggle) self.trans.bind_all('', self.bindPlaySoloToggle) # create a menu bar self.createMenubar(master) # fetch toolbar contents from each component self.AddToToolbar(self.toolbar) #self.trans.AddToToolbar(self.toolbar) self.wsurf.AddToToolbar(self.toolbar) Entry(self.toolbar).pack(side=LEFT, expand=1, fill=X) # for testing, load a sound file if DEBUG: self.wsurf.ac.loadFile(os.path.join('..', 'speech', 'sp1.wav')) if opts.has_key('soundFileName'): self.wsurf.ac.loadFile(opts['soundFileName']) if opts.has_key('reverseChannels'): self.reverseChannels = 1 else: self.reverseChannels = None if opts.has_key('annotationFileName'): fileFormat = 'AG' self.loadAnnotationFile(opts['annotationFileName'], fileFormat) try: self.speechDir = os.environ["AGTK_TABLETRANS_SPEECH_DIR"] except: try: self.speechDir = os.environ["AGTK_SPEECH_DIR"] except: #if os.path.exists(os.path.join(agtkPath, '..', '..', # 'speech')): # self.speechDir = os.path.normpath( # os.path.join(agtkPath, '..', '..', # 'speech')) #else: # self.speechDir = os.path.normpath( # os.path.join(os.getcwd(), '..', # 'speech')) self.speechDir = "." if self.speechDir and not os.environ.has_key("AGWSURF_OUT_SPEECH_DIR"): os.environ["AGWSURF_OUT_SPEECH_DIR"] = self.speechDir try: self.annotationDir = os.environ["AGTK_TABLETRANS_ANNOTATION_DIR"] except: try: self.annotationDir = os.environ["AGTK_ANNOTATION_DIR"] except: #if os.path.exists(os.path.join(agtkPath, '..', '..', # 'annotations')): # self.annotationDir = os.path.normpath( # os.path.join(agtkPath, '..', '..', # 'annotations')) #else: # self.annotationDir = os.path.normpath( # path.join(os.getcwd(), '..', # 'annotations')) self.annotationDir = "." try: self.configDir = os.environ["AGTK_TABLETRANS_CONFIG_DIR"] except: self.configDir = os.getcwd() self.limited = None ### end of __init__ ### event message handlers (callback functions) for the two ### components def transEvent(self, event): """ Callback function for the transcription widget""" if not event.has_key('Name'): return {} if DEBUG: print event['Name'] if event['Name'] == 'Play': # request from the transcription widget to start playing audio self.wsurf.ac.play(self._currentStartPosition, self._currentEndPosition) if event['Name'] == 'PlaySolo': self.bindPlaySolo() elif event['Name'] == 'Stop': # request to stop audio return {} elif event['Name'] == 'SetCurrentSpan': startAnn = event['StartAnnotationId'] endAnn = event['EndAnnotationId'] panel = event['Panel'] self._setCurrentSpan(startAnn, endAnn, panel) return elif event['Name'] == 'GetRegion': # Get the current region from the waveform. # This version explicitly send 'getRegion' to # the wsurf component each time. inmsg = self.wsurf.ac.getRegion('') if inmsg.has_key('StartPosition') and \ inmsg.has_key('EndPosition'): self._currentStartPosition = float(inmsg['StartPosition']) self._currentEndPosition = float(inmsg['EndPosition']) msg = {} msg['StartPosition'] = self._currentStartPosition msg['EndPosition'] = self._currentEndPosition return msg #elif event['Name'] == 'AddFilledPauseAnnotation': # start = event['StartAnchor'] # end = event['EndAnchor'] # self.mdeAPI.AddFilledPauseAnnotation(start, end) # self._modified = 1 # return elif event['Name'] == 'AddFilledPauseAnnotation': start = event['StartAnchor'] end = event['EndAnchor'] difficult = event['DifficultDecision'] comment = event['Comment'] tokens = event['Tokens'] speakerAnn = self.agId2speaker[ag.GetAGId(start)] annSet = self.mdeAPI.AddFilledPauseAnnotationSet( start, end, tokens, speakerAnn, difficult, comment) if annSet: self._currentAnnotation = annSet[-1] self._modified = 1 return annSet elif event['Name'] == 'AddUnclearAnnotation': start = event['StartAnchor'] end = event['EndAnchor'] difficult = event['DifficultDecision'] comment = event['Comment'] tokens = event['Tokens'] speakerAnn = self.agId2speaker[ag.GetAGId(start)] ann = self.mdeAPI.AddUnclearAnnotation(start, end, tokens, speakerAnn, difficult, comment) self._modified = 1 return ann elif event['Name'] == 'AddInterruptionPoint': #start = event['StartAnchor'] #end = event['EndAnchor'] tokenAnn = event['TokenAnnotationId'] ann = event['EDAnnotationId'] #self.mdeAPI.AddInterruptionPoint(start, end) self.mdeAPI.AddIPrightedge(ann, tokenAnn) self._modified = 1 return elif event['Name'] == 'AddDiscourseMarkerAnnotation': start = event['StartAnchor'] end = event['EndAnchor'] difficult = event['DifficultDecision'] comment = event['Comment'] tokens = event['Tokens'] speakerAnn = self.agId2speaker[ag.GetAGId(start)] ann = self.mdeAPI.AddDiscourseMarkerAnnotation( start, end, tokens, speakerAnn, difficult, comment) if ann: self._currentAnnotation = ann self._modified = 1 return ann elif event['Name'] == 'AddExplicitEditingTermAnnotation': start = event['StartAnchor'] end = event['EndAnchor'] difficult = event['DifficultDecision'] comment = event['Comment'] tokens = event['Tokens'] speakerAnn = self.agId2speaker[ag.GetAGId(start)] ann = self.mdeAPI.AddExplicitEditingTermAnnotation( start, end, tokens, speakerAnn, difficult, comment) if ann: self._currentAnnotation = ann self._modified = 1 return ann elif event['Name'] == 'AddAsideAnnotation': start = event['StartAnchor'] end = event['EndAnchor'] difficult = event['DifficultDecision'] comment = event['Comment'] tokens = event['Tokens'] speakerAnn = self.agId2speaker[ag.GetAGId(start)] ann = self.mdeAPI.AddAsideAnnotation( start, end, tokens, speakerAnn, difficult, comment) if ann: self._currentAnnotation = ann self._modified = 1 return ann elif event['Name'] == 'AddDePoDAnnotation': start = event['StartAnchor'] end = event['EndAnchor'] difficult = event['DifficultDecision'] comment = event['Comment'] tokens = event['Tokens'] speakerAnn = self.agId2speaker[ag.GetAGId(start)] ann = self.mdeAPI.AddDePoDAnnotation( start, end, tokens, speakerAnn, difficult, comment) if ann: self._currentAnnotation = ann self._modified = 1 return ann elif event['Name'] == 'AddQuestionableTranscriptionAnnotation': start = event['StartAnchor'] end = event['EndAnchor'] tokens = event['Tokens'] speakerAnn = self.agId2speaker[ag.GetAGId(start)] difficult = event['DifficultDecision'] comment = event['Comment'] ann = self.mdeAPI.AddQuestionableTranscriptionAnnotation( start, end, tokens, speakerAnn, difficult, comment) if ann: self._currentAnnotation = ann self._modified = 1 return ann elif event['Name'] == 'AddSUAnnotation': start = event['StartAnchor'] end = event['EndAnchor'] tokens = event['Tokens'] type = event['Type'] try: replace = event['Replace'] except: replace = None difficult = event['DifficultDecision'] comment = event['Comment'] speakerAnn = self.agId2speaker[ag.GetAGId(start)] ann = self.mdeAPI.AddSUAnnotation(start, end, type, tokens, speakerAnn, difficult, replace, comment) if ann: self._currentAnnotation = ann self._modified = 1 return ann elif event['Name'] == 'DeleteSUAnnotation': ann = event['AnnotationId'] speakerAnn = self.agId2speaker[ag.GetAGId(ann)] out = self.mdeAPI.DeleteSUAnnotation(ann, speakerAnn) #print out if out: self._modified = 1 return out elif event['Name'] == 'GetTokenText': ann = event['AnnotationId'] if ag.GetAnnotationType(ann) == "token": tokens = [(ann, None),] else: tokens = self.mdeAPI.GetWordList3(ann) out = [] for t, w in tokens: out.append(ag.GetFeature(t, 'text')) return out #anc1 = ag.GetStartAnchor(ann) #anc2 = ag.GetEndAnchor(ann) #return self._getTokenText(anc1, anc2) elif event['Name'] == 'GetAnnotationsInSpan': start = event['StartAnchor'] end = event['EndAnchor'] tokens = event['Tokens'] #annSet = self.mdeAPI.ExistsAnnotationsInSpan(start, # end) #annSet = self.mdeAPI.ExistsAnnotationsInSpan2(tokens) return self.mdeAPI.GetAnnotationsInSpan(tokens) elif event['Name'] == 'ExistsAnnotationTypeInSpan': type = event['Type'] start = event['StartAnchor'] end = event['EndAnchor'] tokens = event['Tokens'] annSet = self.mdeAPI.ExistsAnnotationTypeInSpan(start, end, type) return annSet elif event['Name'] == 'ExistsAnnotationTypeInSpanExclusive': type = event['Type'] start = event['StartAnchor'] end = event['EndAnchor'] tokens = event['Tokens'] annSet = self.mdeAPI.ExistsAnnotationTypeInSpanExclusive( start, end, type) return annSet elif event['Name'] == 'CreateAnnotation': # Request to create an annotation in the main script start = event['StartAnchor'] end = event['EndAnchor'] elif event['Name'] == 'DeleteAnnotation': # Request to delete an annotation if event['AnnotationId'] == '': return ann = event['AnnotationId'] type = event['Type'] speakerAnn = self.agId2speaker[ag.GetAGId(ann)] if type == 'filledPause': result = self.mdeAPI.DeleteFilledPauseAnnotation(ann, speakerAnn) elif type == 'discourseMarker': result = self.mdeAPI.DeleteDiscourseMarkerAnnotation(ann, speakerAnn) elif type == 'questionableTranscription': result = self.mdeAPI.DeleteQuestionableTranscriptionAnnotation(ann, speakerAnn) elif type == 'noRTMetadata': result = self.mdeAPI.DeleteUnclearAnnotation(ann, speakerAnn) pass elif type == 'explicitEditingTerm': result = self.mdeAPI.DeleteExplicitEditingTermAnnotation( ann, speakerAnn) pass elif type == 'aside': result = self.mdeAPI.DeleteAsideAnnotation( ann, speakerAnn) pass elif type == 'depod': result = self.mdeAPI.DeleteDePoDAnnotation(ann, speakerAnn) pass else: result = None if result: self._modified = 1 return result elif event['Name'] == 'CommitRegion': # Request to set start and/or end positions of the current # annotation in the main program if event.has_key('StartPosition') and \ event.has_key('EndPosition'): self._commitRegion(event['StartPosition'], event['EndPosition']) else: # The key is not listed above. print "fall through" return {} def signalEvent(self, event): """ Callback function for the wsurf widget """ if DEBUG: print event['Name'] if event['Name'] == 'RegionChanged': # Ignore RegionChanged in this tool --- instead, this script # uses GetRegion each time a reference to a region # in the waveform is necessary. pass elif event['Name'] == 'Stopping': # Not used for this tool pass elif event['Name'] == 'PlayStarted': # Not used for this tool pass elif event['Name'] == 'Playing': # Notifying that audio is playing. # 'Playing' is used for aligned play back. try: cur = float(event['CurrentPosition']) annSet = ag.GetAnnotationSetByOffset(self.AGIds[0], cur).split() ann = None for a in annSet: if ag.GetAnnotationType(a) == "segment": ann = a if ann: self.trans.SetCurrentLine(ann) self.trans.ac.setCurrentAnnotation(ann) self._updateCurrent(ann) self.wsurf.ac.setRegion(ann, self._currentStartPosition, self._currentEndPosition) except: return elif event['Name'] == 'GetTimeOffsetList': return self.returnStartAndEndTimes() elif event['Name'] == 'GetFeatureList': return self._featurelist elif event['Name'] == 'GetFeature': ann = event['AnnotationId'] feature = event['Feature'] return ag.GetFeature(ann, feature) elif event['Name'] == 'GetFilenameStem': fn = self._currentAnnotationFileName.get() stem = "" if fn != "": fn2 = os.path.basename(fn) m = re.match('^([^\.]*)', fn2) if m: try: stem = m.group(0) except: pass return stem else: if DEBUG: print "fall through" ### AG-related methods def uninitAG(self): """ Delete AGSet, Timeline and AG in order to load an AG file. """ try: ag.DeleteAGSet(self.AGSetId) except: pass self.AGSetId = "" self.timelineId = "" self.AGIds = [] self._currentStartPosition = 0.00 self._currentEndPosition = 0.00 self._currentAnnotation = '' self._currentAnnotationFileName.set('') ### Some helper methods def _floatToString(self, val, dig=3): """ Convert a floating-point number to a string with the specified number of digits after the point. E.g., 3.00000 -> 3.000, if dig is 3. """ format = '%0.'+str(dig)+'f' out = eval('format % float(val)') return out def _checkAnnotationId(self, id): """ check if annotation ID exists """ if id == '': return 0 if int(ag.ExistsAnnotation(id)): return 1 else: return 0 ### Methods to handle tasks to be done within the main (manager) ### script def _getTokenText(self, anc1, anc2): if anc1 == anc2: return () out = () annSet = ag.GetOutgoingAnnotationSet(anc1).split() for a in annSet: if ag.GetAnnotationType(a) == "token": text = ag.GetFeature(a, "text") out += (text,) anc3 = ag.GetEndAnchor(a) out += self._getTokenText(anc3, anc2) return out def _setCurrentSpan(self, startAnn, endAnn, panel): #self._currentStartPosition = getNearestOffsetBefore(startAnchor) #self._currentStartPosition = getNearestOffsetByTypeBefore2( # startAnchor, "segment") #self._currentEndPosition = getNearestOffsetByTypeAfter2( # endAnchor, "segment") #(self._currentStartPosition, # self._currentEndPosition) = getNearestOffsetByTypeBoth( # endAnchor, "segment") try: seg = self.token2segment[startAnn] self._currentStartPosition = ag.GetStartOffset(seg) except: return try: seg = self.token2segment[endAnn] self._currentEndPosition = ag.GetEndOffset(seg) except: return #print "set", self._currentStartPosition, self._currentEndPosition if self._currentEndPosition is None: if self._currentStartPosition: return self._currentEndPosition = self._currentStartPosition if self.corpusType == 'cts': if panel == self.trans.textList[0]: if not self.reverseChannels: self.wsurf.highlightChannel(0) else: self.wsurf.highlightChannel(1) elif panel == self.trans.textList[1]: if not self.reverseChannels: self.wsurf.highlightChannel(1) else: self.wsurf.highlightChannel(0) try: #print self._currentStartPosition, self._currentEndPosition self.wsurf.ac.setRegion("null", self._currentStartPosition, self._currentEndPosition) except: pass def _commitRegion(self, start, end): """ set start & end times of annotation with given values """ id = self._currentAnnotation if self._checkAnnotationId(id): ag.SetStartOffset(id, start) ag.SetEndOffset(id, end) self._modified = 1 def _compareTwoLists(self, list1, list2): """ Check if two lists (or tuples) contain the same elements. Return a tuple of (diff1, diff2) where: diff1 is a list of elements that are in list1 but not in list2, and diff2 is a list of elements that are in list2 but not in list1. """ diff1 = [] diff2 = [] for i in list1: if not i in list2: diff1.append(i) for i in list2: if not i in list1: diff2.append(i) return (diff1, diff2) def _updateCurrent(self, ann): #self.trans. pass def getAtlasChildren(self, ann): try: childrenStr = ag.GetFeature(ann, "_AtlasAnnChil_") except: return out = {} childrenList = string.split(childrenStr, " ; ") for child in childrenList: items = child.split(" ") ref = items[0] role = items[1] key = ref+" "+role annList = items[2:] out[key] = annList return out ### def returnStartAndEndTimes(self): """ Return start and end times of all sorted annotations in the following formart: (('3.234', '4.443'), ('12.344', '23.333'), ...) """ allAnn = ag.GetAnnotationSeqByOffset(self.AGId[0]) out = () for ann in string.split(allAnn): start = self._floatToString(ag.GetStartOffset(ann)) end = self._floatToString(ag.GetEndOffset(ann)) out = out + ((start, end, ann),) return out ### File I/O related methods ### methods with names that begin with 'bind' are bound to events def bindOpenAnnotationFileInAGXML(self, event=None): """ bound to the 'OpenAnnotationFileInAGXML' event """ if self._fileOpen: Pmw.MessageDialog(message_text=""" There is an open file --- please restart the tool work on a new file.""") return file = askopenfile(title="Open Annotation File in AG/AIF0", initialdir = self.annotationDir, filetypes=(("AG XML files", "*.ag.xml"), ("All files", "*"))) if file is not None: self.loadAnnotationFileInAGXML(file.name) self.annotationDir = os.path.dirname(file.name) def loadAnnotationFile(self, filename, format): """ load an annotation file """ if format == 'AG' or format == 'AIF0': self.loadAnnotationFileInAGXML(filename) def loadAnnotationFileInAGXML(self, filename): """ load an AG (AIF0) annotation file; delete the current AG Set, etc. before loading """ self.uninitAG() try: self.AGIds = ag.Load("AG", filename, "", {}, {"DTDvalidation":"false"}) except: d = Pmw.MessageDialog(title = "Open Failed", buttons = ("OK",), message_text = "Problem loading File: "+ filename + "\n") result = d.activate() return self.AGSetId = ag.GetAGSetId(self.AGIds[0]) if self.corpusType == 'cts': self.copyToTextWidgetAll() maxAn = 1 for agid in self.AGIds: maxAn1 = self.checkMaxAtlasAnnNum(agid) if maxAn1 > maxAn: maxAn = maxAn1 self.mdeAPI.SetMaxAtlasAnnNum(maxAn) #self.copyToTextWidget(0) #self.copyToTextWidget(1) elif self.corpusType == 'bn': self.copyToTextWidget(0) maxAn = 1 for agid in self.AGIds: maxAn1 = self.checkMaxAtlasAnnNum(agid) if maxAn1 > maxAn: maxAn = maxAn1 self.mdeAPI.SetMaxAtlasAnnNum(maxAn) else: print "Unknown File Type" sys.exit() self._fileFormat = "AIF0" self._fileOpen = 1 self._currentAnnotationFileName.set(filename) def checkMaxAtlasAnnNum(self, agId): max = 0 for ann in ag.GetAnnotationSeqByOffset(agId): #print ann try: atlasAnnId = ag.GetFeature(ann, "_AtlasAnnID_") except: continue numMatch = re.match("Ann(.*)$", atlasAnnId) if numMatch: num = numMatch.group(1) if max < int(num): max = int(num) return max def bindSaveAnnotationFile(self, event=None): """ bound to SaveAnnotationFile event """ saved = None currentFile = self._currentAnnotationFileName.get() if currentFile == "": Pmw.MessageDialog(message_text='File name not set.\n'+ 'Please use "Save Annotations As .." Menu Item') elif self._fileFormat is None: Pmw.MessageDialog(message_text='File format not selected.') else: if self._fileFormat == "AIF0": self.saveAnnotationFileInAGXML(currentFile) saved = 1 return saved def bindSaveAnnotationFileAsAGXML(self, event=None): """ bound to SaveAnnotationFileAsAGXML event """ filename = asksaveasfilename( title="Save Annotations in AIF0 File", initialdir= self.annotationDir, filetypes=(("AG XML files", "*.ag.xml"), ("All files", "*"))) if filename != '': self._fileFormat = "AIF0" self.saveAnnotationFileInAGXML(filename) self._currentAnnotationFileName.set(filename) self.annotationDir = os.path.dirname(filename) def saveAnnotationFileInAGXML(self, filename): """ save AG data to an AG XML file """ #for agid in self.AGIds: # self.mdeAPI.PropagateSUAnnotations(agid) try: f = open(filename, 'w') out = ag.toXML(self.AGSetId) f.write(out) f.close() self._modified = None except: d = Pmw.MessageDialog(title = "Save file failed", defaultbutton = 0, buttons =('OK',), message_text = "Save File Failed:") d.activate() def bindImportTransFile(self): if self._fileOpen: Pmw.MessageDialog(message_text=""" There is an open file --- please re-start the tool work on a new file.""") return file = askopenfile(title="Import Transcription File", initialdir = self.annotationDir, filetypes=(("txt files", "*.txt"), ("All files", "*"))) if file is not None: self.importTransFile(file.name) self.annotationDir = os.path.dirname(file.name) self._fileOpen = 1 def importTransFile(self, filename): self.mdeAPI.Initialize() self.mdeAPI.LoadTranscriptionFile(filename) self.AGSetId = self.mdeAPI.AGSetId self.AGIds = self.mdeAPI.AGIds #print ag.toXML(self.mdeAPI.AGSetId) self.copyToTextWidget(0) self.copyToTextWidget(1) def copyToTextWidgetAll(self): if self.corpusType == 'cts': index = 0 annList = [] for agId in self.AGIds: annList.append(self.mdeAPI.GetAnnotationSegmentList(agId)) index += 1 try: self.agId2speaker[agId] = ag.GetAnnotationSet(agId, "speaker")[0] except: pass (segList0, segList1) = self.matchTwoSegLists( annList[0], annList[1]) if len(segList0) != len(segList1): print "segList Doesn't match" line = 0 for ann0 in segList0: height0 = None height1 = None if ann0: self.trans.textList[0].segmentDict[ann0] = line #self.copySegmentToTextWidget(0, ann0) height0 = self.copySegmentToTextWidget( self.trans.textList[0], ann0, self.token2segment, self.token2w) #else: # self.trans.textList[0].InsertEmptyLine() ann1 = segList1[line] if ann1: self.trans.textList[1].segmentDict[ann1] = line #self.copySegmentToTextWidget(1, ann1) height1 = self.copySegmentToTextWidget( self.trans.textList[1], ann1, self.token2segment, self.token2w) #else: # self.trans.textList[1].InsertEmptyLine() line += 1 if height0 and height1 is None: self.trans.textList[1].InsertEmptyLine(height0) if height1 and height0 is None: self.trans.textList[0].InsertEmptyLine(height1) self.processAnnotations(0) self.processAnnotations(1) def matchTwoSegLists(self, list1, list2): list1Out = [] list2Out = [] i1 = 0 i2 = 0 while 1: if i1 < len(list1): s1 = list1[i1] else: if i2 < len(list2): s2 = list2[i2] list1Out.append(None) list2Out.append(s2) i2 += 1 continue else: break if i2 < len(list2): s2 = list2[i2] else: if i1 < len(list1): s1 = list1[i1] list1Out.append(s1) list2Out.append(None) i1 += 1 continue else: break t1 = ag.GetStartOffset(s1) t2 = ag.GetStartOffset(s2) if t1 < t2: list1Out.append(s1) list2Out.append(None) i1 += 1 elif t1 > t2: list1Out.append(None) list2Out.append(s2) i2 += 1 else: list1Out.append(s1) list2Out.append(s2) i1 += 1 i2 += 1 return (list1Out, list2Out) def copyToTextWidget(self, index): if self.corpusType == 'cts': annSet = self.mdeAPI.GetAnnotationSegmentList( self.AGIds[index]) else: annSet = self.mdeAPI.GetAnnotationSegmentListAll(self.AGSetId) for agid in self.AGIds: try: self.agId2speaker[agid] = ag.GetAnnotationSet(agid, "speaker")[0] except: pass #except: pass #print annSet line = 0 lastAG = ag.GetAGId(annSet[0]) for ann in annSet: self.trans.textList[index].segmentDict[ann] = line #self.copySegmentToTextWidget(index, ann) self.copySegmentToTextWidget(self.trans.textList[index], ann, self.token2segment, self.token2w) if lastAG != ag.GetAGId(ann): self.trans.textList[index].InsertEmptyLine(10) lastAG = ag.GetAGId(ann) line += 1 self.processAnnotations(index) def copySegmentToTextWidget(self, textWidget, ann, token2segment, token2w): # move code to mdeText s = ag.GetStartAnchor(ann) e = ag.GetEndAnchor(ann) #wordList = self.mdeAPI.GetWordList(s, e) wordList = self.mdeAPI.GetWordList3(ann) if not wordList or wordList == (): return agId = ag.GetAGId(ann) lastThreeWords = [] w = None firstW = None #textWidget.tag_delete("currentline") #startPos = textWidget.index(END) for (ann2, word) in wordList: #self.token2segment[ann2] = ann token2segment[ann2] = ann if PREPROCESS: self.preprocessFilledPauses(ann2) start = ag.GetStartAnchor(ann2) end = ag.GetEndAnchor(ann2) cat = ag.GetFeature(ann2, "category") #if re.match("^%", word): test = re.match("^~(.*)$", word) if test: word = test.group(1) test = re.match("^\^(.*)$", word) if test: word = test.group(1) if re.match("vocalNoise", cat): #w = self.trans.textList[index].InsertWord( w = textWidget.InsertWord( word, ann2, start, end, agId, cat, END, fg='green') elif re.match("nonvocalNoise", cat): #w = self.trans.textList[index].InsertWord( w = textWidget.InsertWord( word, ann2, start, end, agId, cat, END, fg='green') else: #w = self.trans.textList[index].InsertWord( w = textWidget.InsertWord( word, ann2, start, end, agId, cat) #self.token2w[ann2] = w token2w[ann2] = w if not firstW: firstW = w if len(lastThreeWords) == 0: lastThreeWords = [(word, w),] else: lastThreeWords = [(word, w),] + lastThreeWords[:2] self.processFontColor(lastThreeWords) #if re.match("filledPause", cat): # #span = self.trans.textList[index].GetSpan(start, # span = textWidget.GetSpan(start, # end) # if span: # self.trans.UnderlineSpan("filledPause", span) for a3 in ag.GetIncomingAnnotationSet(end): if ag.GetAnnotationType(a3) == "SU": try: type = ag.GetFeature(a3, "type") except: w.addSlash(a3, "black") break if type == 'statement': w.addSlash(a3, "blue", "/. ") elif type == 'question': w.addSlash(a3, "red", "/? ") elif type == 'backchannel': w.addSlash(a3, "black", "/@ ") elif type == 'incomplete': w.addSlash(a3, "brown", "/- ") elif type == 'clausal': w.addSlash(a3, "#008000", "/, ") elif type == 'coordinating': w.addSlash(a3, "#000080", "/& ") else: print "unknown SU type" #endPos = textWidget.index(END) #textWidget.tag_add("currentline", startPos, endPos) #print 'index', startPos, endPos #self.trans.textList[index].InsertNewLine() textWidget.InsertNewLine() textWidget.updateAndSee() if firstW and w: top = textWidget.getYInPixel(firstW) bottom = textWidget.getYBottomInPixel(w) if top is not None and bottom is not None: height = bottom - top return height else: return None def processAnnotations(self, index): self.copyAnnotationsToText("filledPause", index, self.trans.textList[index], self.token2w, 1, addStarLeft = 1) self.copyAnnotationsToText("discourseMarker", index, self.trans.textList[index], self.token2w, 1, addStarLeft = 1) self.copyAnnotationsToText("aside", index, self.trans.textList[index], self.token2w, 1, addStarLeft = 1) self.copyAnnotationsToText("explicitEditingTerm", index, self.trans.textList[index], self.token2w, 1, addStarLeft = 1) self.copyAnnotationsToText("depod", index, self.trans.textList[index], self.token2w, 1) self.copyAnnotationsToText("questionableTranscription", index, self.trans.textList[index], self.token2w, 1) self.copyAnnotationsToText("noRTMetadata", index, self.trans.textList[index], self.token2w, 1) self.trans.FocusFirstToken() def preprocessFilledPauses(self, token): text = ag.GetFeature(token, "text") if text == "uh" or \ text == "um" or \ text == "er" or \ text == "eh" or \ text == "erm" or \ text == "ah" or \ text == "Uh" or \ text == "Um" or \ text == "Er" or \ text == "Eh" or \ text == "Erm" or \ text == "Ah": agId = ag.GetAGId(token) speakerAnn = self.agId2speaker[agId] ann = self.mdeAPI.AddFilledPauseAnnotation( ag.GetStartAnchor(token), ag.GetEndAnchor(token), (token,), speakerAnn, 0, "") def processFontColor(self, tokens): # filled pauses if tokens[0][0] == "uh" or \ tokens[0][0] == "um" or \ tokens[0][0] == "er" or \ tokens[0][0] == "eh" or \ tokens[0][0] == "erm" or \ tokens[0][0] == "ah" or \ tokens[0][0] == "Uh" or \ tokens[0][0] == "Um" or \ tokens[0][0] == "Er" or \ tokens[0][0] == "Eh" or \ tokens[0][0] == "Erm" or \ tokens[0][0] == "Ah": tokens[0][1].setFg('blue') return # discourse markers if tokens[0][0] == "actually" or \ tokens[0][0] == "anyway" or \ tokens[0][0] == "basically" or \ tokens[0][0] == "like" or \ tokens[0][0] == "now" or \ tokens[0][0] == "see" or \ tokens[0][0] == "well" or \ tokens[0][0] == "Actually" or \ tokens[0][0] == "Anyway" or \ tokens[0][0] == "Basically" or \ tokens[0][0] == "Like" or \ tokens[0][0] == "Now" or \ tokens[0][0] == "See" or \ tokens[0][0] == "Well": tokens[0][1].setFg('red') return try: if tokens[1][0] == "i" and tokens[0][0] == "mean" or \ tokens[1][0] == "I" and tokens[0][0] == "mean" or \ tokens[1][0] == "you" and tokens[0][0] == "know" or \ tokens[1][0] == "You" and tokens[0][0] == "know" or \ tokens[1][0] == "you" and tokens[0][0] == "see" or \ tokens[1][0] == "You" and tokens[0][0] == "see" or \ tokens[1][0] == "let's" and tokens[0][0] == "see" or \ tokens[1][0] == "Let's" and tokens[0][0] == "see": tokens[1][1].setFg('red') tokens[0][1].setFg('red') return except: pass try: if (tokens[2][0] == "let's" and \ tokens[1][0] == "see" and \ tokens[0][0] == "now") or \ (tokens[2][0] == "let's" and \ tokens[1][0] == "see" and \ tokens[0][0] == "now"): tokens[2][1].setFg('red') tokens[1][1].setFg('red') tokens[0][1].setFg('red') return except: pass # backchannels if tokens[0][0] == "uh-huh" or \ tokens[0][0] == "yeah" or \ tokens[0][0] == "yes" or \ tokens[0][0] == "yep" or \ tokens[0][0] == "mm-hm" or \ tokens[0][0] == "mm-hmm" or \ tokens[0][0] == "hmm" or \ tokens[0][0] == "hm" or \ tokens[0][0] == "right" or \ tokens[0][0] == "oh" or \ tokens[0][0] == "sure" or \ tokens[0][0] == "okay" or \ tokens[0][0] == "really" or \ tokens[0][0] == "huh" or \ tokens[0][0] == "Uh-huh" or \ tokens[0][0] == "Yeah" or \ tokens[0][0] == "Yes" or \ tokens[0][0] == "Yep" or \ tokens[0][0] == "Mm-hm" or \ tokens[0][0] == "Mm-hmm" or \ tokens[0][0] == "Hmm" or \ tokens[0][0] == "Hm" or \ tokens[0][0] == "Right" or \ tokens[0][0] == "Oh" or \ tokens[0][0] == "Sure" or \ tokens[0][0] == "Okay" or \ tokens[0][0] == "Really" or \ tokens[0][0] == "Huh": tokens[0][1].setFg('#007700') return def grayOutChildren(self, ann, token2w): atlasChildren = self.getAtlasChildren(ann) try: tokens = atlasChildren["token token"] except: return for i in tokens: token2w[i].fgGray() def copyAnnotationsToText(self, type, index, textWidget, token2w, main = None, altType=None, **kw): # note: move code to mdeText if self.corpusType == 'cts': annSet = ag.GetAnnotationSet(self.AGIds[index], type) else: annSet = [] #print self.AGIds, self.AGSetId for agId in self.AGIds: #print agId annSet += ag.GetAnnotationSet(agId, type) for ann in annSet: if type == "depod": atlasChildren = self.getAtlasChildren(ann) try: interrupts = atlasChildren["token IPrightedge"] except: interrupts = [] for i in interrupts: token2w[i].addStar() wl = self.mdeAPI.GetWordList3(ann) wla = [] for w in wl: wla.append(w[0]) span = textWidget.GetSpan2(wla) if main: self.trans.tagAnnotationSpanDict[ann] = span #print type, index, altType, ann, span if span: if not altType: self.trans.UnderlineSpan(type, span) else: self.trans.UnderlineSpan(altType, span) if kw.has_key("addStarLeft"): span[0].addStarLeft() else: pass # UNCOMMENT THE FOLLOWING WHEN DEBUGGING #print "Something wrong", type, index, altType, ann, span def bindNewAnnotationWidget(self, event=None): # ask the user if he/she wants to save the current annotation # table if self._modified: result = self.saveAnnotationDialog() if result == 'Cancel': return result if result == 'Not Saved': return result # reset the table self.trans.ac.clearAllAnnotations() # reset the AG #self.uninitAG() #self.initAG() self._modified = None return 'New' def saveAnnotationDialog(self): d = Pmw.MessageDialog(title = "Save Current Annotations in File?", defaultbutton = 0, buttons =('Yes', 'No', 'Cancel'), message_text = "Save Current Annotations?") result = d.activate() if result == "Yes": if self.bindSaveAnnotationFile(): result = "Saved" else: result = "Not Saved" return result def bindOpenSoundFile(self, event=None): """ bound to OpenSoundFile event """ #try: file = askopenfile(title="Open Sound File", initialdir= self.speechDir, filetypes=(("WAV files", "*.wav"), ("AU files", "*.au"), ("AIFF files", "*.aiff"), ("All files", "*"))) if file is not None: self.wsurf.ac.loadFile(file.name) self.speechDir = os.path.dirname(file.name) #except: # d = Pmw.MessageDialog(title = "Open file failed", # defaultbutton = 0, # buttons =('OK',), # message_text = "Open Sound File Failed") # d.activate() #def bindCloseSoundFile(self, event=None): # self.wsurf.ac.closeFile() def _checkSaveAndExit(self): if self._modified: result = self.saveAnnotationDialog() # if annotations have been saved (return value 'Saved') or # if the user doesn't want to save it (return value 'No') if result == 'Saved' \ or result == 'No': pass else: return self.master.quit() ### Menu-related methods def bindMenuRefresh(self): self.trans.ac.clearAllAnnotations() return def createMenubar(self, master): """ create a menu bar """ self.mbar = agUtils.agMenubar(master) self.mbar.file = self.mbar.createMenu('File') #self.mbar.file.add_command(label='New', # command=self.bindNewAnnotationWidget) # self.mbar.file.openAnnotationFile = Menu(self.mbar.file, tearoff=0) #self.mbar.file.add_cascade(label='Open Annotation File', # menu=self.mbar.file.openAnnotationFile) self.mbar.file.openAnnotationFile.add_command( label='Open Annotation File (in AG XML)', command=self.bindOpenAnnotationFileInAGXML) self.mbar.file.add_command(label='Open Annotation File', command=self.bindOpenAnnotationFileInAGXML) #self.mbar.file.add('separator') #self.mbar.file.add_command(label='Import Transcription File', # command=self.bindImportTransFile) self.mbar.file.add('separator') self.mbar.file.add_command(label='Open Sound File', command=self.bindOpenSoundFile) self.mbar.file.add('separator') #self.mbar.file.add_command(label='Refresh Annotation Table', # command=self.bindMenuRefresh) self.mbar.file.add_command(label='Save', command=self.bindSaveAnnotationFile) self.mbar.file.saveAnnotationsAs = Menu(self.mbar.file, tearoff=0) self.mbar.file.add_cascade(label='Save Annotations As ...', menu=self.mbar.file.saveAnnotationsAs) self.mbar.file.saveAnnotationsAs.add_command( label='Save Annotations As ... (in AG Format)', command=self.bindSaveAnnotationFileAsAGXML) self.mbar.file.add('separator') self.mbar.file.add_command(label='Validate Data', command=self.validateAll) self.mbar.file.add('separator') self.mbar.file.add_command(label='Exit', command=self._checkSaveAndExit) # import menu items from the trans object self.trans.AddToMenu(self.mbar) #self.wsurf.AddToMenu(self.mbar) self.mbar.file = self.mbar.createMenu('Edit') self.mbar.file.add_command(label='Add Annotation (Return or Right Mouse Button)', command=self.trans.CreateTagDialog) self.mbar.file.add('separator') self.mbar.file.add_command(label='Delete Annotation (Ctl-d)', command=self.trans.DeleteTagDialog) self.mbar.file.add('separator') self.mbar.file.add_command(label='SU Annotation Window', command=self.trans.ToSUAnnotationMode) self.mbar.file.add('separator') self.mbar.file.add_command(label="Show Rendered Text (All Channels)", command=self.showFilteredText) self.mbar.file.add_command(label="Show Selected Annotations (Selected Channel)", command=self.showLimitedAnnotations) self.mbar.file.add_command(label="Show List of Annotations (Selected Channel)", command=self.showAnnotationList) #self.mbar.file.add('separator') #self.mbar.file.add_command(label="Run NIST's Validator ", # command=self.runRTValidator) #self.mbar.file.add_command(label="Run NIST's Renderer ", # command=self.runRTRenderer) self.mbar.help = self.mbar.createMenu('Help') #self.mbar.help.add_command(label='About', command=Pmw.AboutDialog) #self.mbar.help.add_command( # label='See Annotation Guidelines (View with browser)', # command=self.showAnnotationGuidelinesHtml) #self.mbar.help.add_command( # label='See Annotation Guidelines (Text format)', # command=self.showAnnotationGuidelinesText) self.mbar.help.add_command( label='See Tool Manual (Text format)', command=self.showManualText) def runRTValidator(self): tmpfile1 = tempfile.mktemp('.ag.xml') try: f = open(tmpfile1, 'w') out = ag.toXML(self.AGSetId) f.write(out) f.close() except: return tmpfile2 = tempfile.mktemp('.txt') command = "/pkg/ldc/projects/mde/scripts3/agRTValidator " command += tmpfile1 + "> " + tmpfile2 text = os.system(command) d = Pmw.TextDialog(self.master, buttons=("Close",), defaultbutton="Close", title='Output of RTValidator') d._text.importfile(tmpfile2) d.configure(text_state='disabled') os.unlink(tmpfile1) os.unlink(tmpfile2) def runRTRenderer(self): tmpfile1 = tempfile.mktemp('.ag.xml') try: f = open(tmpfile1, 'w') out = ag.toXML(self.AGSetId) f.write(out) f.close() except: return tmpfile2 = tempfile.mktemp('.txt') command = "/pkg/ldc/projects/mde/scripts3/agRTRenderer " command += tmpfile1 + "> " + tmpfile2 text = os.system(command) d = Pmw.TextDialog(self.master, buttons=("Close",), defaultbutton="Close", title='Output of RTValidator') d._text.importfile(tmpfile2) d.configure(text_state='disabled') os.unlink(tmpfile1) os.unlink(tmpfile2) pass def agIdToSpeakerCode(self, id): codes = ('A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z') for index in range(len(self.AGIds)): if id == self.AGIds[index]: try: return codes[index] except: return '?' return '?' def insertFilteredText(self, d, segments): tokens = [] agId = "" #print segments for seg in segments: #print seg wl = self.mdeAPI.GetWordList3(seg) if not wl: continue for w in wl: tokens.append(w[0]) if not tokens or tokens == []: return #if not lines or lines == []: # return #tokens = self.mdeAPI.GetTokensFromSpeaker(speaker) text = "" skip = None #keepEditedPortion = None punc = None indent = "" lineText = {} agId = None for t in tokens: e = ag.GetEndAnchor(t) if punc: sc = self.agIdToSpeakerCode(agId) text += sc+ ": " + lineText[agId] + punc+"\n\n" lineText[agId] = indent punc = None test = mdeAPI.checkIncomingAnnotations(t, "SU") if test: type = ag.GetFeature(test, "type") if type == "statement": punc = "/." indent = "" elif type == "question": punc = "/?" indent = "" elif type == "backchannel": punc = "/@" indent = "" elif type == "incomplete": punc = "/-" indent = "" elif type == "clausal": punc = "/," indent = "\t" elif type == "coordinating": punc = "/&" indent = "\t" else: punc = "!" indent = "" if skip == e: skip = None continue if skip: continue cat = ag.GetFeature(t, 'category') if re.match("vocalNoise", cat) or \ re.match("nonvocalNoise", cat) or \ re.match("filledPause", cat): continue tokenText = ag.GetFeature(t, 'text') if tokenText == "." or \ tokenText == "," or \ tokenText == "?": continue test = mdeAPI.checkOutgoingAnnotations(t, "explicitEditingTerm") if test: skip = ag.GetEndAnchor(test) if skip == e: skip = None continue test = mdeAPI.checkOutgoingAnnotations(t, "aside") if test: skip = ag.GetEndAnchor(test) if skip == e: skip = None continue test = mdeAPI.checkOutgoingAnnotations(t, "depod") if test: skip = ag.GetEndAnchor(test) if skip == e: skip = None continue test = mdeAPI.checkOutgoingAnnotations(t, "discourseMarker") if test: skip = ag.GetEndAnchor(test) if skip == e: skip = None continue test = mdeAPI.checkOutgoingAnnotations(t, "filledPause") if test: skip = ag.GetEndAnchor(test) if skip == e: skip = None continue if skip: continue else: agId = ag.GetAGId(t) try: lineText[agId] += ag.GetFeature(t, "text") + " " except: lineText[agId] = ag.GetFeature(t, "text") + " " startTime = ag.GetStartOffset(t) d.settext(text) def showFilteredText(self, event=None): annSet = self.mdeAPI.GetAnnotationSegmentListAll(self.AGSetId) #if self.corpusType == 'cts': # if self.trans.currentText == self.trans.textList[0]: # index = 0 # elif self.trans.currentText == self.trans.textList[1]: # index = 1 # annSet = self.mdeAPI.GetAnnotationSegmentList( # self.AGIds[index]) #else: # annSet = self.mdeAPI.GetAnnotationSegmentListAll(self.AGSetId) d = Pmw.TextDialog(self.master, buttons=("Close",), defaultbutton="Close", title='Rendered Text') d.configure(text_state='disabled') #d._text._hull.configure(font=('Helvetica', '10')) #self.insertFilteredText(d, speaker) self.insertFilteredText(d, annSet) def showLimitedAnnotations(self, event=None): if self.limited: try: self.limited.destroy() except: pass self.limited = None if self.trans.currentText == self.trans.textList[0]: agId = self.AGIds[0] elif self.trans.currentText == self.trans.textList[1]: agId = self.AGIds[1] typeList = ( ("filledPause", 1), ("Discourse Marker", 2), #("editDisfluency (All types)", 3), #("editDisfluency (Restart --- Both keep & discard)", 4), ("Explicit Editing Term", 4), ("Aside", 5), ("DePoD", 6), #("Edit Disfluency (Restart - Keep)", 5), #("Edit Disfluency (Restart - Discard)", 6), #("Edit Disfluency (Repetition)", 7), #("Edit Disfluency (Revision)", 8), #("Edit Disfluency (Complex)", 9), ("Skip/noRTMetadata", 10), ("Questionable Transcription", 11)) d = Pmw.Dialog(self.master, buttons=("Open", "Close",), defaultbutton="Open", title="Select Annotations to Display") self.limited = d f1 = Frame(d.interior()) f1.pack(fill=BOTH, expand=1) g11 = Pmw.Group(f1, tag_text="Types") g11.pack(fill=BOTH, expand=1, side=LEFT) typeVar = IntVar() for type, val in typeList: setattr(typeVar, type, IntVar()) Checkbutton(g11.interior(), text=type, anchor=W, variable=getattr(typeVar, type) ).pack(fill=BOTH, expand=1) result = d.activate() if result == "Close": return typeList1 = [] for type, val in typeList: if getattr(typeVar, type).get(): typeList1.append((type, val)) d2 = Pmw.Dialog(self.master, buttons=("Close",), defaultbutton="Close", title="Selected Annotations") frame = Frame(d2.interior()) frame.pack(expand=1, fill=BOTH) text = mdeText.ReadOnlyText(frame) text.pack(expand=1, fill=BOTH) self.refreshLimitedAnnotations(text.text, typeList1) def refreshLimitedAnnotations(self, textWidget, typeList): typeDict = {} for type, value in typeList: typeDict[value] = 1 if self.trans.currentText == self.trans.textList[0]: index = 0 elif self.trans.currentText == self.trans.textList[1]: index = 1 if self.corpusType == 'cts': annSet = self.mdeAPI.GetAnnotationSegmentList( self.AGIds[index]) else: annSet = self.mdeAPI.GetAnnotationSegmentListAll(self.AGSetId) for agid in self.AGIds: try: self.agId2speaker[agid] = ag.GetAnnotationSet(agid, "speaker")[0] except: pass #except: pass #print annSet line = 0 token2segment = {} token2w = {} for ann in annSet: textWidget.segmentDict[ann] = line self.copySegmentToTextWidget(textWidget, ann, token2segment, token2w) line += 1 if typeDict.has_key(1): self.copyAnnotationsToText("filledPause", index, textWidget, token2w, None, addStarLeft = 1) if typeDict.has_key(2): self.copyAnnotationsToText("discourseMarker", index, textWidget, token2w, None, addStarLeft = 1) if typeDict.has_key(4): self.copyAnnotationsToText("explicitEditingTerm", index, textWidget, token2w, None, addStarLeft = 1) if typeDict.has_key(5): self.copyAnnotationsToText("aside", index, textWidget, token2w, None, addStarLeft = 1) if typeDict.has_key(6): self.copyAnnotationsToText("depod", index, textWidget, token2w, None, "depod") #if typeDict.has_key(7): # self.copyAnnotationsToText("editDisfluency", index, # textWidget, # token2w, None, # "editDisfluency0", # editDisfluencyType="repetition") #if typeDict.has_key(8): # self.copyAnnotationsToText("editDisfluency", index, # textWidget, # token2w, None, # "editDisfluency0", # editDisfluencyType="revision") #if typeDict.has_key(9): # self.copyAnnotationsToText("editDisfluency", index, # textWidget, # token2w, None, # "editDisfluency0", # editDisfluencyType="complex") if typeDict.has_key(10): self.copyAnnotationsToText("noRTMetadata", index, textWidget, token2w, None) if typeDict.has_key(11): self.copyAnnotationsToText("questionableTranscription", index, textWidget, token2w, None) def showAnnotationList(self, event=None): if self.corpusType == 'cts': if self.trans.currentText == self.trans.textList[0]: agIds = [self.AGIds[0],] elif self.trans.currentText == self.trans.textList[1]: agIds = [self.AGIds[1],] else: agIds = self.AGIds try: columnList = self.showList_columnList except: columnList = (("Type", "type", 20, 1), ("Tokens", "tokens", 30, 1), ("Difficult Dicision", "difficultToAnnotate", 6, 1), ("Comments", "comment", 30, 1), ("Annotation ID", "annotationId", 25, 0), ("Children IDs", "ChildrenIDs", 60, 0), ("Atlas ID", "AtlasID", 10, 0), ("Token IDs", "TokenIDs", 40, 0), ("Atlas Token IDs", "AtlasTokenIDs", 30, 0), ("Start Anchor", "StartAnchor", 20, 0), ("End Anchor", "EndAnchor", 20, 0), ("Start Offset", "StartOffset", 7, 0), ("End Offset", "EndOffset", 7, 0), ) try: typeList = self.showList_typeList except: typeList = (("Filled Pause", "filledPause", 1), ("Discourse Marker", "discourseMarker", 1), ("Explicit Editing Term", "explicitEditingTerm", 1), ("DePoD", "depod", 1), ("Aside", "aside", 1), ("Skip/NoRTMetadata", "noRTMetadata", 0), ("Questionable Transcription", "questionableTranscription", 0) #"SU" ) d = Pmw.Dialog(self.master, buttons=("Show", "Cancel"), defaultbutton="Show", title="Choose Annotation Types and Features to Display") f1 = Frame(d.interior()) f1.pack(fill=BOTH, expand=1) g11 = Pmw.Group(f1, tag_text="Types") g11.pack(fill=BOTH, expand=1, side=LEFT) g12 = Pmw.Group(f1, tag_text="Features") g12.pack(fill=BOTH, expand=1, side=LEFT) typeVar = IntVar() for type1, type2, select in typeList: setattr(typeVar, type2, IntVar()) tmp = Checkbutton(g11.interior(), text=type1, anchor=W, variable=getattr(typeVar, type2) ) if select: tmp.select() tmp.pack(fill=BOTH, expand=1) colVar = IntVar() for col1, col2, width, select in columnList: setattr(colVar, col2, IntVar()) tmp = Checkbutton(g12.interior(), text=col1, anchor=W, variable=getattr(colVar, col2)) if select: tmp.select() tmp.pack(fill=BOTH, expand=1) result = d.activate() if result == "Cancel": return typeList1 = [] newTypeList = [] for type1, type2, select in typeList: if getattr(typeVar, type2).get(): typeList1.append((type1, type2)) newTypeList.append((type1, type2, 1)) else: newTypeList.append((type1, type2, 0)) columnList1 = [] columnList2 = [] newColumnList = [] for col1, col2, width, select in columnList: if getattr(colVar, col2).get(): columnList1.append((col2, width)) columnList2.append((col1, width)) newColumnList.append((col1, col2, width, 1)) else: newColumnList.append((col1, col2, width, 0)) self.showList_columnList = newColumnList self.showList_typeList = newTypeList #print typeList1, columnList1 if len(typeList1) == 0 or len(columnList1) == 0: return annDictList = [] for agId in agIds: annDictList += self.getAnnotations(agId, typeList1) annDictList.sort(self.cmpByStartOffsetOfItem) out = [] out2 = [] for annDict in annDictList: item = [] for col, width in columnList1: try: item.append(annDict[col]) #print "\t"+annDict[col], except: item.append("") #print "\t" #print "\n" out.append(item) out2.append(annDict['annotationId']) d = Pmw.Dialog(self.master, buttons=("Close",), defaultbutton="Close", title="Annotation List") mlb = MyMultiListbox( d.interior(), columnList2, out2, self.gotoSpan) mlb.pack(expand=YES,fill=BOTH) for item in out: mlb.insert(END, item) def gotoSpan(self, ann): if not ann: return tokens = [] wl = self.mdeAPI.GetWordList3(ann) if not wl: return for w in wl: tokens.append(w[0]) ws = [] for token in tokens: ws.append(self.token2w[token]) #print ws self.trans.FocusWs(ws) def cmpByStartOffsetOfItem(self, a, b): ao = ag.GetStartOffset(a['annotationId']) bo = ag.GetStartOffset(b['annotationId']) if ao > bo: return 1 elif ao < bo: return -1 else: #if a['type2'] != 'editDisfluency' and \ # b['type2'] == 'editDisfluency': # return 1 #if a['type2'] == 'editDisfluency' and \ # b['type2'] != 'editDisfluency': # return -1 return 0 def getAnnotations(self, agId, typeList): out = [] for type1, type2 in typeList: for ann in ag.GetAnnotationSet(agId, type2): item = {} item["annotationId"] = ann wl = self.mdeAPI.GetWordList3(ann) tokenStr = "" tokenIDs = "" atlasTokenIDs = "" for w in wl: tokenStr += w[1]+" " tokenIDs += w[0]+" " try: atlasTokenIDs += ag.GetFeature( w[0], "_AtlasAnnID_")+" " except: pass item["type"] = type1 item["type2"] = type2 item["tokens"] = tokenStr try: item["difficultToAnnotate"] = ag.GetFeature( ann, "difficultToAnnotate") except: pass try: item["comment"] = ag.GetFeature( ann, "comment") except: pass try: item["ChildrenIDs"] = ag.GetFeature( ann, "_AtlasAnnChil_") except: pass try: item["AtlasID"] = ag.GetFeature( ann, "_AtlasAnnID_") except: pass try: item["TokenIDs"] = tokenIDs except: pass try: item["AtlasTokenIDs"] = atlasTokenIDs except: pass try: item["StartAnchor"] = ag.GetStartAnchor(ann) except: pass try: item["EndAnchor"] = ag.GetEndAnchor(ann) except: pass try: item["StartOffset"] = ag.GetStartOffset(ann) except: pass try: item["EndOffset"] = ag.GetEndOffset(ann) except: pass out.append(item) return out def showAnnotationGuidelinesHtml(self, event=None): webbrowser.open("file:"+os.path.abspath( os.path.join(agtkPath,'AnnotationGuidelines.html'))) return def showAnnotationGuidelinesText(self, event=None): d = Pmw.Dialog() st = Pmw.ScrolledText(d.interior(), borderframe=1, labelpos=N, hull_width=400, hull_height=300, text_padx=10, text_pady=10, text_wrap='none') #st.importfile(os.path.abspath(os.path.join( # agtkPath, # 'AnnotationGuidelines.txt'))) st.pack(fill=BOTH, expand=1) def showManualText(self, event=None): try: d = Pmw.Dialog() st = Pmw.ScrolledText(d.interior(), borderframe=1, labelpos=N, hull_width=400, hull_height=300, text_padx=10, text_pady=10, text_wrap='none') st.importfile(os.path.abspath( os.path.join(agtkPath, 'mdeToolManual.txt'))) st.pack(fill=BOTH, expand=1) except: pass ### Toolbar related methods def AddToToolbar(self, toolbar): """ add items specific to the main script to the toolbar """ try: #Label(toolbar, text='Annotation File').pack(side=LEFT, fill=X) Entry(toolbar, width=30, textvariable=self._currentAnnotationFileName, state=DISABLED).pack(side=LEFT, fill=X) except: return def bindPlaySoloToggle(self, e=None): if self.numChannels == 1: self.wsurf.bindPlayToggle() return if self.trans.currentText == self.trans.textList[0]: if not self.reverseChannels: self.wsurf.highlightChannel(0) self.wsurf.bindPlayToggle1() else: self.wsurf.highlightChannel(1) self.wsurf.bindPlayToggle2() elif self.trans.currentText == self.trans.textList[1]: if not self.reverseChannels: self.wsurf.highlightChannel(1) self.wsurf.bindPlayToggle2() else: self.wsurf.highlightChannel(0) self.wsurf.bindPlayToggle1() def bindPlaySolo(self, e=None): if self.numChannels == 1: self.wsurf.play() return if self.trans.currentText == self.trans.textList[0]: if self.reverseChannels: self.wsurf.play1() else: self.wsurf.play2() elif self.trans.currentText == self.trans.textList[1]: if self.reverseChannels: self.wsurf.play2() else: self.wsurf.play1() def validateAll(self, e=None): """ Run validator functions on the data """ out = "" out += "Checking Discourse Marker Annotations...\n\n" for agId in self.AGIds: for ann in ag.GetAnnotationSet(agId, "discourseMarker"): wl = self.mdeAPI.GetWordList3(ann) validate = self.mdeAPI.ValidateDiscourseMarkers(ann) if not validate: out += "There is a discourseMarker annotation that is not on the list.\n" tokenStr = "" for w in wl: tokenStr += w[1]+" " out += tokenStr out += "\n" out += "Checking Backchannels...\n\n" for agId in self.AGIds: for ann in ag.GetAnnotationSet(agId, "SU"): if ag.GetFeature(ann, "type") == "backchannel": wl = self.mdeAPI.GetWordList3(ann) validate = self.mdeAPI.ValidateBackchannels(ann) if not validate: out += "There is a backchannel SU that is not on the list\n" tokenStr = "" for w in wl: tokenStr += w[1]+" " out += tokenStr out += "\n" out += "Checking SUs...\n\n" for agId in self.AGIds: annSet = self.mdeAPI.GetAnnotationSegmentList(agId) for ann in annSet: wl = self.mdeAPI.GetWordList3(ann) validate = self.mdeAPI.ValidateSegmentAndSU(ann) if not validate and len(wl) == 1: out += "There is no SU assigned for the following segment: Is this OK?\n" tokenStr = "" for w in wl: tokenStr += w[1]+" " out += tokenStr out += "\n" #for ann in ag.GetAnnotationSet(agId, # "turn"): # wl = self.mdeAPI.GetWordList3(ann) # validate = self.mdeAPI.ValidateTurnAndSU(ann) # if not validate: # print ann # print wl # print "There is no SU assigned for turn" # else: # print ann # print wl # print "There is SU assigned for turn" d = Pmw.Dialog() st = Pmw.ScrolledText(d.interior(), borderframe=1, labelpos=N, hull_width=400, hull_height=300, text_padx=10, text_pady=10, text_wrap='none') st.insert(END, out) st.pack(fill=BOTH, expand=1) ### End of class agEarsTrans class ListViewAnnotations(Frame): def __init__(self, master, columnList): Frame.__init__(self, master) self.mlb = MultiListbox.MultiListbox( self, columnList) self.mlb.pack(expand=YES,fill=BOTH) Button(self, text="Go to", command=self.goto).pack(expand=YES,fill=BOTH) def goto(self): pass if __name__ == "__main__": opts = processCmdlineOpts(sys.argv) root = Tk() root.title("MDE Annotation Tool") main = mdeMain(root, opts) root.mainloop()