-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathvirtualcoke.py
executable file
·364 lines (278 loc) · 11.2 KB
/
virtualcoke.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
#!/usr/bin/env python
# vim: set tabstop=4 shiftwidth=4 expandtab ai
import npyscreen
from datetime import datetime
# npyscreen twisted reactor
import npyscreenreactor
reactor = npyscreenreactor.install()
# Incorporates code from
#Pymodbus Asynchronous Server Example
#---------------------------------------------------------------------------#
# import the various server implementations
#---------------------------------------------------------------------------#
from pymodbus.constants import Defaults
from pymodbus.transaction import ModbusSocketFramer, ModbusAsciiFramer
from pymodbus.server.async import ModbusServerFactory
from pymodbus.datastore import ModbusSequentialDataBlock, ModbusSparseDataBlock
from pymodbus.datastore import ModbusSlaveContext, ModbusServerContext
# for emulator code
import os
import sys
import string
# Ascii Coke Logo
# by Normand Veilleux
# in cokelogo.txt
##---------------------------------------------------------------------------#
## configure the service logging
##---------------------------------------------------------------------------#
import logging
logging.basicConfig(filename="virtualcoke.log", filemode="a+")
log = logging.getLogger()
log.setLevel(logging.DEBUG)
# Factory for modbus
def StartModbusAsyncServer(context, identity=None, address=None, console=False):
''' Helper method to start the Modbus Async TCP server
:param context: The server data context
:param identify: The server identity to use (default empty)
:param address: An optional (interface, port) to bind to.
:param console: A flag indicating if you want the debug console
'''
from twisted.internet import reactor
address = address or ("", Defaults.Port)
framer = ModbusSocketFramer
factory = ModbusServerFactory(context, framer, identity)
if console: InstallManagementConsole({'factory': factory})
reactor.listenTCP(address[1], factory, interface=address[0])
log.info("Starting Modbus TCP Server on %s:%s" % address)
#
class ContainedMultiSelect(npyscreen.BoxTitle):
_contained_widget = npyscreen.TitleMultiSelect
class CokeButtonPress(npyscreen.MiniButtonPress):
def __init__(self, screen, when_pressed_function=None, when_pressed_callback=None, *args, **keywords):
super(CokeButtonPress, self).__init__(screen, *args, **keywords)
self.when_pressed_callback = when_pressed_callback
def whenPressed(self,key=None):
if self.when_pressed_callback:
self.when_pressed_callback(widget=self)
class VirtualCoke(npyscreen.Form):
def while_waiting(self):
self.date_widget.value = datetime.now().ctime()
self.statusfield.value = self.parentApp.status
self.infofield.value = self.parentApp.info
self.display()
def create(self, *args, **keywords):
super(VirtualCoke, self).create(*args, **keywords)
logofile = os.path.join(os.path.dirname(os.path.realpath(sys.argv[0])), "cokelogo.txt")
logotext = open(logofile).read()
self.cokelogo = []
cly = 1
for line in logotext.splitlines():
widget = self.add(npyscreen.FixedText, value=line, editable = False, rely = cly)
cly = cly + 1
self.cokelogo.append(widget)
# LEDS
self.leds = []
lx = 1
ly = cly
for led in range(0,7):
lx = ((7 * 7) + 3) - ( led * 7 )
widget = self.add(npyscreen.FixedText, value="[SOLD]", editable = False, relx = lx, rely = ly)
self.leds.append(widget)
# Buttons
self.buttons = []
bx = 1
by = ly + 1
for button in range(0,7):
bx = ((7 * 7) + 3) - ( button * 7 )
widget = self.add(npyscreen.FixedText, value=str(button), editable = False, relx = bx, rely = by)
self.add_handlers({"%d" % button: self.parentApp.when_keypad_pressed})
self.buttons.append(widget)
for button in range(7,10):
bx = (8 * 7) + ((button - 6) * 4 ) + 3
widget = self.add(npyscreen.FixedText, value=str(button), editable = False, relx = bx, rely = by)
self.add_handlers({"%d" % button: self.parentApp.when_keypad_pressed})
self.buttons.append(widget)
# Manual
bx = bx + 3
widget = self.add(npyscreen.FixedText, value="M", editable = False, relx = bx, rely = by)
self.add_handlers({"M": self.parentApp.when_keypad_pressed})
self.add_handlers({"m": self.parentApp.when_keypad_pressed})
self.buttons.append(widget)
#
# Slots
self.slots = []
sx = 1
sy = by + 1
widget = self.add(npyscreen.FixedText, value="Empty?", editable = False, relx = sx, rely = sy)
for slot in range(0,7):
sx = ((7 * 7) + 3) - ( slot * 7 )
widget = self.add(npyscreen.CheckboxBare, name=str(slot), relx = sx, rely = sy, value_changed_callback=self.parentApp.when_empty_toggled)
self.slots.append(widget)
self.update_sold(slot, widget.value)
for slot in range(7,10):
sx = (8 * 7) + ((slot - 6) * 4 ) + 3
widget = self.add(npyscreen.CheckboxBare, name=str(slot), relx = sx, rely = sy, value_changed_callback=self.parentApp.when_empty_toggled)
self.slots.append(widget)
self.update_sold(slot, widget.value)
self.date_widget = self.add(npyscreen.FixedText, value=datetime.now().ctime(), editable=False, rely=19)
self.date_widget.value = "Hello"
self.add_handlers({"^Q": self.exit_application})
self.statusfield = self.add(npyscreen.TitleText, name = "Status:", value="", editable=False, rely=20 )
self.infofield = self.add(npyscreen.TitleText, name = "Info:", value="", editable=False )
def exit_application(self,name):
self.parentApp.setNextForm(None)
self.editing = False
def get_slot_status(self):
status = []
for slot in self.slots:
status.append(slot.value)
return status
def update_sold(self, slot, value):
self.parentApp.do_info("Slot %d toggled" % slot)
if value:
if slot < 6:
self.leds[slot].value = "[SOLD]"
# coke slot magic
elif self.slots[6].value and \
self.slots[7].value and \
self.slots[8].value and \
self.slots[9].value:
self.leds[6].value = "[SOLD]"
else:
if slot < 6:
self.leds[slot].value = "[ ]"
else:
self.leds[6].value = "[ ]"
class VirtualCokeApp(npyscreen.StandardApp):
keypress_timeout_default = 1
def onStart(self):
# initialise virtual coke machine
self.F = self.addForm("MAIN", VirtualCoke, name="Virtual Coke - ^Q to exit")
self.status=""
self.info="in onStart"
def while_waiting(self):
pass
def onCleanExit(self):
pass
# Coke Emulator comms below
def do_status(self, data):
self.status = data
def do_info(self, data):
self.info = data
# Callbacks
def get_slot_status(self):
return self.F.get_slot_status()
def when_empty_toggled(self, *args, **keywords):
# See
# https://code.google.com/p/npyscreen/source/detail?r=9768a97fd80ed1e7b3e670f312564c19b1adfef8#
# for callback info
self.do_status(keywords['widget'].name + " " + str(self.F.get_slot_status()))
self.F.update_sold(int(keywords['widget'].name), keywords['widget'].value)
self.F.display()
def when_reset_pressed(self, *args, **keywords):
self.do_status('211 keypress\n')
keywords['widget'].value = False
self.F.display()
def when_keypad_pressed(self, *args, **keywords):
self.do_status(("%c" % args[0]))
self.F.display()
# Coke Emulator code below
#---------------------------------------------------------------------------#
# initialize your data store
#---------------------------------------------------------------------------#
# The datastores only respond to the addresses that they are initialized to.
# Therefore, if you initialize a DataBlock to addresses of 0x00 to 0xFF, a
# request to 0x100 will respond with an invalid address exception. This is
# because many devices exhibit this kind of behavior (but not all)::
#
# block = ModbusSequentialDataBlock(0x00, [0]*0xff)
#
# Continuting, you can choose to use a sequential or a sparse DataBlock in
# your data context. The difference is that the sequential has no gaps in
# the data while the sparse can. Once again, there are devices that exhibit
# both forms of behavior::
#
# block = ModbusSparseDataBlock({0x00: 0, 0x05: 1})
# block = ModbusSequentialDataBlock(0x00, [0]*5)
#
# Alternately, you can use the factory methods to initialize the DataBlocks
# or simply do not pass them to have them initialized to 0x00 on the full
# address range::
#
# store = ModbusSlaveContext(di = ModbusSequentialDataBlock.create())
# store = ModbusSlaveContext()
#
# Finally, you are allowed to use the same DataBlock reference for every
# table or you you may use a seperate DataBlock for each table. This depends
# if you would like functions to be able to access and modify the same data
# or not::
#
# block = ModbusSequentialDataBlock(0x00, [0]*0xff)
# store = ModbusSlaveContext(di=block, co=block, hr=block, ir=block)
#---------------------------------------------------------------------------#
#---------------------------------------------------------------------------#
# create your custom data block with callbacks
#---------------------------------------------------------------------------#
class CallbackDataBlock(ModbusSparseDataBlock):
''' A datablock that stores the new value in memory
and passes the operation to a message queue for further
processing.
'''
def __init__(self, values, app=None):
self.toggle = 1
self.app = app
super(CallbackDataBlock, self).__init__(values)
def getValues(self, address, count=1):
''' Returns the requested values from the datastore
:param address: The starting address
:param count: The number of values to retrieve
:returns: The requested values from a:a+c
'''
ciCoke_DropBitBase = 1024 + 1;
ciCoke_StatusBitBase = 16 + 1;
# get status of slots from form
slots = self.app.get_slot_status()
#log.debug("CBD getValues %d:%d" % (address, count))
# check if a status read or a drop read
if address < ciCoke_DropBitBase:
log.debug("Status Read getValues %d:%d" % (address, count))
if address < ciCoke_StatusBitBase or \
address >= ciCoke_StatusBitBase + len(slots):
return [0]
# calculate slot
reading = address - ciCoke_StatusBitBase
# invert
status = not slots[reading]
return [status]
log.debug("Dispense Read getValues %d:%d" % (address, count))
if address >= ciCoke_DropBitBase + len(slots):
return [0]
dispensing = address - ciCoke_DropBitBase
now = datetime.now().ctime()
self.app.do_info("At %s, Dispense slot %d ...... bzzzztclunk ..." % (now, dispensing))
self.toggle = self.toggle+1
#log.debug("CBD getValues toggle %d" % (self.toggle))
if self.toggle % 3 == 0:
return [1]
if self.toggle % 3 == 1:
return [1]
if self.toggle % 3 == 2:
return [0]
return [1]
def modbus_setup(app):
store = ModbusSlaveContext(
di = CallbackDataBlock([0]*100, app=app),
co = CallbackDataBlock([0]*65536, app=app),
hr = CallbackDataBlock([0]*100, app),
ir = CallbackDataBlock([0]*100, app))
context = ModbusServerContext(slaves=store, single=True)
return context
if __name__ == "__main__":
App = VirtualCokeApp()
context = modbus_setup(App)
reactor.registerNpyscreenApp(App)
try:
StartModbusAsyncServer(context)
except:
StartModbusAsyncServer(context, address=("", 1502))
reactor.run()