Scorpion Vision Software can send image from one application instance to another using shared memory.
The SMImageProvider class
The following method creates SMImageProvider object:
- SMImageProvider(shareName, width, height, bytesPerPixel = 1)
Method |
Returns |
Description |
Process() |
None |
processes requests from the shared memory client, set up scheduler to call this method
periodically |
Write(image) |
None |
sends the image to the shared memory client |
The SMImageClient class
The following method creates SMImageClient object:
- SMImageClient(shareName, width, height, bytesPerPixel = 1)
Property |
Access |
Type |
Description |
state |
R |
SMImageClient.State |
the connection state - Connected/Disconnected |
Method |
Returns |
Description |
Connect() |
None |
connects to the shared memory |
Read() |
Image matrix |
reads the image matrix from the shared memory,
returns None if there is no image |
How images are transfered
- Connection setup
- The image provider creates the named shared memory.
- The image provider creates the named pipe with name '\\.\pipe\sharename'.
- The client connects to the named pipe.
- The client requests the image size by writing the data packet:
- The image provider responds with data packet:
- uint value: 2
- uint value: image width
- uint value: image height
- uint value: bytes per pixel
- Image transfer
- The image provider copies the image to the named shared memory.
- The image provider notifies the client that image ready by writing the data packet to the named pipe:
Test and Sample Profiles
Example 1: Using SMImageProvider to send a monochrome image
# Central Start
import SharedMemory
# SharedMemory.SMImageProvider(shareName, width, height, bytesPerPixel = 1)
# Create image provider to send monochrome image
sm = SharedMemory.SMImageProvider('test', 760, 570, 1)
# SMProcess
def SMProcess():
# Call this function from trigger every 100ms
import datetime
global processRunning
try:
sm.Process()
if processRunning == False:
print datetime.datetime.now(), "Process running"
processRunning = True
except:
pass
# SendImage
def SendImage():
try:
sm.Write(GetImageMatr('Image'))
except Exception, msg:
print msg
Example 2: Using SMImageClient to get a monochrome image
# Central Start
import SharedMemoryClient
smc2d = None
smc2d = SharedMemoryClient.SMImageClient("test", 760, 570, 1)
if smc2d.state == SharedMemoryClient.SMImageClient.State.Disconnected:
smc2d.Connect()
#ReceiveImage
def ReceiveImage():
# Call this function from trigger every 100ms
if smc2d != None:
obj = smc2d.Read()
if obj != None and str(type(obj)) == "":
SetImageMatr('Image', obj)
if smc2d.state == SharedMemoryClient.SMImageClient.State.Disconnected:
try:
# try to reconnect. This is needed if read actually detected a disconnect
smc2d.Connect()
except:
pass
Example 3: Using SMImageProvider to send a color image
# Central Start
import SharedMemory
# SharedMemory.SMImageProvider(shareName, width, height, bytesPerPixel = 1)
# Create image provider to send color image
sm = SharedMemory.SMImageProvider('test', 760, 570, 4)
# SMProcess
def SMProcess():
# Call this function from trigger every 100ms
import datetime
global processRunning
try:
sm.Process()
if processRunning == False:
print datetime.datetime.now(), "Process running"
processRunning = True
except:
pass
# SendImage
def SendImage():
try:
sm.Write(GetImageMatr('Image'))
except Exception, msg:
print msg
Example 4: Using SMImageClient to get a color image
# Central Start
import SharedMemoryClient
smc2d = None
smc2d = SharedMemoryClient.SMImageClient("test", 760, 570, 4)
if smc2d.state == SharedMemoryClient.SMImageClient.State.Disconnected:
smc2d.Connect()
#ReceiveImage
def ReceiveImage():
# Call this function from trigger every 100ms
if smc2d != None:
obj = smc2d.Read()
if obj != None and str(type(obj)) == "":
SetImageMatr('Image', obj)
if smc2d.state == SharedMemoryClient.SMImageClient.State.Disconnected:
try:
# try to reconnect. This is needed if read actually detected a disconnect
smc2d.Connect()
except:
pass
Example 5: Using SMImageProvider to send a 3D image
# Central Start
import SharedMemory
# SharedMemory.SMImageProvider(shareName, width, height, bytesPerPixel = 1)
# Create image provider to send 3D image
sm = SharedMemory.SMImageProvider('test', 760, 570, 20)
# SMProcess
def SMProcess():
# Call this function from trigger every 100ms
import datetime
global processRunning
try:
sm.Process()
if processRunning == False:
print datetime.datetime.now(), "Process running"
processRunning = True
except:
pass
# SendImage
def SendImage():
try:
# build 3D image
height = 100
width = 100
cloud = xyzwvfMat(height, width)
maxZ = random() * 50
midHeight = height/2
midWidth = width/2
for r in range(height):
for c in range(width):
zr = ((midHeight - abs(midHeight - r)) / float(midHeight))
zr = zr * zr * maxZ
zc = ((midWidth - abs(midWidth - c)) / float(midWidth))
zc = zc * zc * maxZ
cloud[r*height+c] = r-(height/2), c-(height/2), zr+zc, 128, 0
SetImageMatr('Image3d',cloud)
sm.Write(cloud)
except Exception, msg:
print msg
Example 6: Using SMImageClient to get a 3D image
# Central Start
import SharedMemoryClient
smc3d = None
smc3d = SharedMemoryClient.SMImageClient("test", 760, 570, 20)
if smc3d.state == SharedMemoryClient.SMImageClient.State.Disconnected:
smc3d.Connect()
#ReceiveImage
def ReceiveImage():
# Call this function from trigger every 100ms
if smc3d != None:
obj = smc3d.Read()
if obj != None and str(type(obj)) == "":
SetImageMatr('Image3d', obj)
if smc3d.state == SharedMemoryClient.SMImageClient.State.Disconnected:
try:
# try to reconnect. This is needed if read actually detected a disconnect
smc3d.Connect()
except:
pass
|