6
6
import org .reactivecommons .api .domain .Command ;
7
7
import org .reactivecommons .api .domain .DomainEvent ;
8
8
import org .reactivecommons .async .api .AsyncQuery ;
9
- import org .reactivecommons .async .rabbit .RabbitMessage ;
10
9
import org .reactivecommons .async .commons .communications .Message ;
11
10
import org .reactivecommons .async .commons .converters .MessageConverter ;
12
11
import org .reactivecommons .async .commons .exceptions .MessageConversionException ;
12
+ import org .reactivecommons .async .rabbit .RabbitMessage ;
13
13
14
14
import java .io .IOException ;
15
+ import java .io .UnsupportedEncodingException ;
15
16
import java .nio .charset .Charset ;
17
+ import java .nio .charset .StandardCharsets ;
16
18
17
19
public class JacksonMessageConverter implements MessageConverter {
18
20
private static final String ENCODING = Charset .defaultCharset ().name ();
@@ -28,7 +30,7 @@ public JacksonMessageConverter(ObjectMapper objectMapper) {
28
30
@ Override
29
31
public <T > AsyncQuery <T > readAsyncQuery (Message message , Class <T > bodyClass ) {
30
32
try {
31
- final AsyncQueryJson asyncQueryJson = objectMapper . readValue (message . getBody () , AsyncQueryJson .class );
33
+ final AsyncQueryJson asyncQueryJson = readValue (message , AsyncQueryJson .class );
32
34
final T value = objectMapper .treeToValue (asyncQueryJson .getQueryData (), bodyClass );
33
35
return new AsyncQuery <>(asyncQueryJson .getResource (), value );
34
36
} catch (IOException e ) {
@@ -39,7 +41,7 @@ public <T> AsyncQuery<T> readAsyncQuery(Message message, Class<T> bodyClass) {
39
41
@ Override
40
42
public <T > DomainEvent <T > readDomainEvent (Message message , Class <T > bodyClass ) {
41
43
try {
42
- final DomainEventJson domainEventJson = objectMapper . readValue (message . getBody () , DomainEventJson .class );
44
+ final DomainEventJson domainEventJson = readValue (message , DomainEventJson .class );
43
45
final T value = objectMapper .treeToValue (domainEventJson .getData (), bodyClass );
44
46
return new DomainEvent <>(domainEventJson .getName (), domainEventJson .getEventId (), value );
45
47
} catch (IOException e ) {
@@ -50,7 +52,7 @@ public <T> DomainEvent<T> readDomainEvent(Message message, Class<T> bodyClass) {
50
52
@ Override
51
53
public <T > Command <T > readCommand (Message message , Class <T > bodyClass ) {
52
54
try {
53
- final CommandJson commandJson = objectMapper . readValue (message . getBody () , CommandJson .class );
55
+ final CommandJson commandJson = readValue (message , CommandJson .class );
54
56
final T value = objectMapper .treeToValue (commandJson .getData (), bodyClass );
55
57
return new Command <>(commandJson .getName (), commandJson .getCommandId (), value );
56
58
} catch (IOException e ) {
@@ -61,7 +63,9 @@ public <T> Command<T> readCommand(Message message, Class<T> bodyClass) {
61
63
@ Override
62
64
public <T > T readValue (Message message , Class <T > valueClass ) {
63
65
try {
64
- return objectMapper .readValue (message .getBody (), valueClass );
66
+ byte [] utf8Body = ensureEncoding (message .getBody (), message .getProperties ().getContentEncoding (),
67
+ StandardCharsets .UTF_8 .name ());
68
+ return objectMapper .readValue (utf8Body , valueClass );
65
69
} catch (IOException e ) {
66
70
throw new MessageConversionException ("Failed to convert Message content" , e );
67
71
}
@@ -71,21 +75,21 @@ public <T> T readValue(Message message, Class<T> valueClass) {
71
75
@ SuppressWarnings ("unchecked" )
72
76
public <T > Command <T > readCommandStructure (Message message ) {
73
77
final CommandJson commandJson = readValue (message , CommandJson .class );
74
- return new Command <>(commandJson .getName (), commandJson .getCommandId (), (T )commandJson .getData ());
78
+ return new Command <>(commandJson .getName (), commandJson .getCommandId (), (T ) commandJson .getData ());
75
79
}
76
80
77
81
@ Override
78
82
@ SuppressWarnings ("unchecked" )
79
83
public <T > DomainEvent <T > readDomainEventStructure (Message message ) {
80
84
final DomainEventJson eventJson = readValue (message , DomainEventJson .class );
81
- return new DomainEvent <>(eventJson .getName (), eventJson .getEventId (), (T )eventJson .getData ());
85
+ return new DomainEvent <>(eventJson .getName (), eventJson .getEventId (), (T ) eventJson .getData ());
82
86
}
83
87
84
88
@ Override
85
89
@ SuppressWarnings ("unchecked" )
86
90
public <T > AsyncQuery <T > readAsyncQueryStructure (Message message ) {
87
91
final AsyncQueryJson asyncQueryJson = readValue (message , AsyncQueryJson .class );
88
- return new AsyncQuery <>(asyncQueryJson .getResource (), (T )asyncQueryJson .getQueryData ());
92
+ return new AsyncQuery <>(asyncQueryJson .getResource (), (T ) asyncQueryJson .getQueryData ());
89
93
}
90
94
91
95
@ Override
@@ -94,8 +98,7 @@ public Message toMessage(Object object) {
94
98
try {
95
99
String jsonString = this .objectMapper .writeValueAsString (object );
96
100
bytes = jsonString .getBytes (ENCODING );
97
- }
98
- catch (IOException e ) {
101
+ } catch (IOException e ) {
99
102
throw new MessageConversionException ("Failed to convert Message content" , e );
100
103
}
101
104
RabbitMessage .RabbitMessageProperties props = new RabbitMessage .RabbitMessageProperties ();
@@ -105,6 +108,14 @@ public Message toMessage(Object object) {
105
108
return new RabbitMessage (bytes , props );
106
109
}
107
110
111
+ private byte [] ensureEncoding (byte [] data , String fromEncoding , String toEncoding )
112
+ throws UnsupportedEncodingException {
113
+ if (fromEncoding .equalsIgnoreCase (toEncoding )) {
114
+ return data ;
115
+ }
116
+ return new String (data , fromEncoding ).getBytes (toEncoding );
117
+ }
118
+
108
119
@ Data
109
120
private static class AsyncQueryJson {
110
121
private String resource ;
0 commit comments